mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
23.01.2014 bf7ffad243778cef715606b14a0f9651910095ee
Actual merge of complete changelog.file test package,
which contains test cases for file-based changelog
6 files added
1 files modified
2818 ■■■■■ changed files
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java 2 ●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java 552 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java 296 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java 602 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java 441 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java 598 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java 327 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
@@ -325,7 +325,7 @@
    {
      final ChangeNumberIndexRecord cnIndexRecord = record.getValue();
      return new ByteStringBuilder()
        .append(record.getKey())
        .append((long) record.getKey())
        .append(cnIndexRecord.getBaseDN().toString())
        .append(STRING_SEPARATOR)
        .append(cnIndexRecord.getCSN().toByteString()).toByteString();
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java
New file
@@ -0,0 +1,552 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.replication.server.changelog.file.BlockLogReader.*;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.forgerock.opendj.ldap.ByteSequenceReader;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ByteStringBuilder;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import com.forgerock.opendj.util.Pair;
@SuppressWarnings("javadoc")
public class BlockLogReaderWriterTest extends DirectoryServerTestCase
{
  private static final File TEST_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
  private static final File TEST_FILE = new File(TEST_DIRECTORY, "file");
  private static final RecordParser<Integer, Integer> RECORD_PARSER = new IntRecordParser();
  private static final int INT_RECORD_SIZE = 12;
  @BeforeClass
  void createTestDirectory()
  {
    TEST_DIRECTORY.mkdirs();
  }
  @BeforeMethod
  void ensureTestFileIsEmpty() throws Exception
  {
    StaticUtils.recursiveDelete(TEST_FILE);
  }
  @AfterClass
  void cleanTestDirectory()
  {
    StaticUtils.recursiveDelete(TEST_DIRECTORY);
  }
  @DataProvider(name = "recordsData")
  Object[][] recordsData()
  {
    return new Object[][]
    {
      // raw size taken by each record is: 4 (record size) + 4 (key) + 4 (value) = 12 bytes
      // size of block, expected size of file after all records are written, records
      { 12, 12, records(1) }, // zero block marker
      { 10, 16, records(1) }, // one block marker
      { 8, 16, records(1) },  // one block marker
      { 7, 20, records(1) },  // two block markers
      { 6, 24, records(1) },  // three block markers
      { 5, 40, records(1) },  // seven block markers
      { 16, 28, records(1,2) }, // one block marker
      { 12, 32, records(1,2) }, // two block markers
      { 10, 36, records(1,2) }, // three block markers
    };
  }
  /**
   * Tests that records can be written then read correctly for different block sizes.
   */
  @Test(dataProvider="recordsData")
  public void testWriteThenRead(int blockSize, int expectedSizeOfFile, List<Record<Integer, Integer>> records)
      throws Exception
  {
    writeRecords(blockSize, records);
    BlockLogReader<Integer, Integer> reader = null;
    try
    {
      reader = newReader(blockSize);
      for (int i = 0; i < records.size(); i++)
      {
         Record<Integer, Integer> record = reader.readRecord();
         assertThat(record).isEqualTo(records.get(i));
      }
      assertThat(reader.readRecord()).isNull();
      assertThat(reader.getFilePosition()).isEqualTo(expectedSizeOfFile);
    }
    finally
    {
      StaticUtils.close(reader);
    }
  }
  @DataProvider(name = "recordsForSeek")
  Object[][] recordsForSeek()
  {
    Object[][] data = new Object[][] {
      // records, key, key matching strategy, position strategy, expectedRecord, should be found ?
      // no record
      { records(), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
      { records(), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
      { records(), 1, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
      // 1 record
      { records(1), 0, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
      { records(1), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
      { records(1), 0, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
      { records(1), 1, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
      { records(1), 0, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
      { records(1), 1, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
      { records(1), 0, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(1), true },
      { records(1), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
      // 3 records equal matching
      { records(1,2,3), 0, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
      { records(1,2,3), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
      { records(1,2,3), 2, EQUAL_TO_KEY, ON_MATCHING_KEY, record(2), true },
      { records(1,2,3), 3, EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true },
      { records(1,2,3), 4, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
      { records(1,2,3), 0, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
      { records(1,2,3), 2, EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(3), true },
      { records(1,2,3), 3, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
      // 3 records less than or equal matching
      { records(1,2,3), 0, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
      { records(1,2,3), 1, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
      { records(1,2,3), 2, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(2), true },
      { records(1,2,3), 3, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true },
      { records(1,2,3), 4, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true },
      { records(1,2,3), 0, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
      { records(1,2,3), 1, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true },
      { records(1,2,3), 2, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(3), true },
      { records(1,2,3), 3, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
      { records(1,2,3), 4, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
      // 3 records greater or equal matching
      { records(1,2,3), 0, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
      { records(1,2,3), 2, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(2), true },
      { records(1,2,3), 3, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true },
      { records(1,2,3), 0, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(1), true },
      { records(1,2,3), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true },
      { records(1,2,3), 2, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(3), true },
      { records(1,2,3), 3, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
      { records(1,2,3), 4, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
      // 10 records equal matching
      { records(1,2,3,4,5,6,7,8,9,10), 0, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
      { records(1,2,3,4,5,6,7,8,9,10), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
      { records(1,2,3,4,5,6,7,8,9,10), 5, EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true },
      { records(1,2,3,4,5,7,8,9,10), 6, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
      { records(1,2,3,4,5,6,7,8,9,10), 10, EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true },
      { records(1,2,3,4,5,6,7,8,9,10), 11, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
      { records(1,2,3,4,5,6,7,8,9,10), 0, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
      { records(1,2,3,4,5,6,7,8,9,10), 1, EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true },
      { records(1,2,3,4,5,6,7,8,9,10), 5, EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(6), true },
      { records(1,2,3,4,5,7,8,9,10), 6, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
      { records(1,2,3,4,5,6,7,8,9,10), 10, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
      { records(1,2,3,4,5,6,7,8,9,10), 11, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
      // 10 records less than or equal matching
      { records(1,2,3,4,5,6,7,8,9,10), 0, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
      { records(1,2,3,4,5,6,7,8,9,10), 1, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
      { records(1,2,3,4,5,6,7,8,9,10), 5, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true },
      { records(1,2,3,4,5,7,8,9,10), 6, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true },
      { records(1,2,3,4,5,6,7,8,9,10), 10, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true },
      { records(1,2,3,4,5,6,7,8,9,10), 11, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true },
      { records(1,2,3,4,5,6,7,8,9,10), 0, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
      { records(1,2,3,4,5,6,7,8,9,10), 1, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true },
      { records(1,2,3,4,5,6,7,8,9,10), 5, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(6), true },
      { records(1,2,3,4,5,7,8,9,10), 6, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(7), true },
      { records(1,2,3,4,5,6,7,8,9,10), 10, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
      { records(1,2,3,4,5,6,7,8,9,10), 11, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
      // 10 records greater or equal matching
      { records(1,2,3,4,5,6,7,8,9,10), 0, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
      { records(1,2,3,4,5,6,7,8,9,10), 1, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
      { records(1,2,3,4,5,6,7,8,9,10), 5, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true },
      { records(1,2,3,4,5,7,8,9,10), 6, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(7), true },
      { records(1,2,3,4,5,6,7,8,9,10), 10, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true },
      { records(1,2,3,4,5,6,7,8,9,10), 11, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
      { records(1,2,3,4,5,6,7,8,9,10), 0, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(1), true },
      { records(1,2,3,4,5,6,7,8,9,10), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true },
      { records(1,2,3,4,5,6,7,8,9,10), 5, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(6), true },
      { records(1,2,3,4,5,7,8,9,10), 6, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(7), true },
      { records(1,2,3,4,5,6,7,8,9,10), 10, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
      { records(1,2,3,4,5,6,7,8,9,10), 11, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
    };
    // For each test case, do a test with various block sizes to ensure algorithm is not broken
    // on a given size
    int[] sizes = new int[] { 500, 100, 50, 30, 25, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10 };
    Object[][] finalData = new Object[sizes.length * data.length][7];
    for (int i = 0; i < data.length; i++)
    {
      for (int j = 0; j < sizes.length; j++)
      {
        Object[] a = data[i];
        // add the block size at beginning of each test case
        finalData[sizes.length*i+j] = new Object[] { sizes[j], a[0], a[1], a[2], a[3], a[4], a[5]};
      }
    }
    return finalData;
  }
  @Test(dataProvider = "recordsForSeek")
  public void testSeekToRecord(int blockSize, List<Record<Integer, Integer>> records, int key,
      KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy, Record<Integer, Integer> expectedRecord,
      boolean shouldBeFound) throws Exception
  {
    writeRecords(blockSize, records);
    BlockLogReader<Integer, Integer> reader = null;
    try
    {
      reader = newReader(blockSize);
      Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, matchingStrategy, positionStrategy);
      assertThat(result.getFirst()).isEqualTo(shouldBeFound);
      assertThat(result.getSecond()).isEqualTo(expectedRecord);
    }
    finally
    {
      StaticUtils.close(reader);
    }
  }
  @Test
  public void testGetClosestMarkerBeforeOrAtPosition() throws Exception
  {
    final int blockSize = 10;
    BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(0)).isEqualTo(0);
    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(5)).isEqualTo(0);
    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(9)).isEqualTo(0);
    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(10)).isEqualTo(10);
    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(15)).isEqualTo(10);
    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(20)).isEqualTo(20);
  }
  @Test
  public void testGetClosestMarkerStrictlyAfterPosition() throws Exception
  {
    final int blockSize = 10;
    BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(0)).isEqualTo(10);
    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(5)).isEqualTo(10);
    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(10)).isEqualTo(20);
    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(11)).isEqualTo(20);
    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(15)).isEqualTo(20);
    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(20)).isEqualTo(30);
  }
  @Test
  public void testSearchClosestMarkerToKey() throws Exception
  {
    int blockSize = 20;
    writeRecords(blockSize, records(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20));
    BlockLogReader<Integer, Integer> reader = null;
    try
    {
      reader = newReader(blockSize);
      assertThat(reader.searchClosestBlockStartToKey(0)).isEqualTo(0);
      assertThat(reader.searchClosestBlockStartToKey(1)).isEqualTo(0);
      assertThat(reader.searchClosestBlockStartToKey(2)).isEqualTo(20);
      assertThat(reader.searchClosestBlockStartToKey(3)).isEqualTo(20);
      assertThat(reader.searchClosestBlockStartToKey(4)).isEqualTo(40);
      assertThat(reader.searchClosestBlockStartToKey(5)).isEqualTo(60);
      assertThat(reader.searchClosestBlockStartToKey(6)).isEqualTo(80);
      assertThat(reader.searchClosestBlockStartToKey(7)).isEqualTo(80);
      assertThat(reader.searchClosestBlockStartToKey(8)).isEqualTo(100);
      assertThat(reader.searchClosestBlockStartToKey(9)).isEqualTo(120);
      assertThat(reader.searchClosestBlockStartToKey(10)).isEqualTo(140);
      assertThat(reader.searchClosestBlockStartToKey(19)).isEqualTo(260);
      assertThat(reader.searchClosestBlockStartToKey(20)).isEqualTo(280);
      // out of reach keys
      assertThat(reader.searchClosestBlockStartToKey(21)).isEqualTo(280);
      assertThat(reader.searchClosestBlockStartToKey(22)).isEqualTo(280);
    }
    finally
    {
      StaticUtils.close(reader);
    }
  }
  @Test
  public void testLengthOfStoredRecord() throws Exception
  {
    final int blockSize = 100;
    BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
    int recordLength = 10;
    assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 20)).isEqualTo(recordLength);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 10)).isEqualTo(recordLength);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 9)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
    recordLength = 150;
    assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 60)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 54)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 53)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
    recordLength = 200;
    assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 8)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 7)).isEqualTo(recordLength + 3 * SIZE_OF_BLOCK_OFFSET);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + 3 * SIZE_OF_BLOCK_OFFSET);
  }
  /**
   * This test is intended to be run only manually to check the performance between binary search
   * and sequential access.
   * Note that sequential run may be extremely long when using high values.
   */
  @Test(enabled=false)
  public void seekPerformanceComparison() throws Exception
  {
    // You may change these values
    long fileSizeInBytes = 100*1024*1024;
    int numberOfValuesToSeek = 50000;
    int blockSize = 256;
    writeRecordsToReachFileSize(blockSize, fileSizeInBytes);
    BlockLogReader<Integer, Integer> reader = null;
    try
    {
      reader = newReader(blockSize);
      List<Integer> keysToSeek = getShuffledKeys(fileSizeInBytes, numberOfValuesToSeek);
      System.out.println("File size: " + TEST_FILE.length() + " bytes");
      System.out.println("\n---- BINARY SEARCH");
      long minTime = Long.MAX_VALUE;
      long maxTime = Long.MIN_VALUE;
      final long t0 = System.nanoTime();
      for (Integer key : keysToSeek)
      {
        final long ts = System.nanoTime();
        Pair<Boolean, Record<Integer, Integer>> result =
            reader.seekToRecord(key, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
        final long te = System.nanoTime() - ts;
        if (te < minTime) minTime = te;
        if (te > maxTime) maxTime = te;
        // show time for seeks that last more than N microseconds (tune as needed)
        if (te/1000 > 1000)
        {
          System.out.println("TIME! key:" + result.getSecond().getKey() + ", time=" + te/1000 + " microseconds");
        }
        assertThat(result.getSecond()).isEqualTo(record(key));
      }
      System.out.println("Time taken: " + ((System.nanoTime() - t0)/1000000) + " milliseconds");
      System.out.println("Min time for a search: " + minTime/1000 + " microseconds");
      System.out.println("Max time for a search: " + maxTime/1000 + " microseconds");
      System.out.println("Max difference for a search: " + (maxTime - minTime)/1000 + " microseconds");
      System.out.println("\n---- SEQUENTIAL SEARCH");
      minTime = Long.MAX_VALUE;
      maxTime = Long.MIN_VALUE;
      final long t1 = System.nanoTime();
      for (Integer val : keysToSeek)
      {
        long ts = System.nanoTime();
        Pair<Boolean, Record<Integer, Integer>> result =
            reader.positionToKeySequentially(0, val, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
        assertThat(result.getSecond()).isEqualTo(Record.from(val, val));
        long te = System.nanoTime() - ts;
        if (te < minTime) minTime = te;
        if (te > maxTime) maxTime = te;
      }
      System.out.println("Time taken: " + ((System.nanoTime() - t1)/1000000) + " milliseconds");
      System.out.println("Min time for a search: " + minTime/1000 + " microseconds");
      System.out.println("Max time for a search: " + maxTime/1000000 + " milliseconds");
      System.out.println("Max difference for a search: " + (maxTime - minTime)/1000000 + " milliseconds");
    }
    finally
    {
      StaticUtils.close(reader);
    }
  }
  /** Write provided records with the provided block size. */
  private void writeRecords(int blockSize, List<Record<Integer, Integer>> records) throws ChangelogException
  {
    BlockLogWriter<Integer, Integer> writer = null;
    try
    {
      writer = newWriter(blockSize);
      for (Record<Integer, Integer> record : records)
      {
        writer.write(record);
      }
    }
    finally
    {
      StaticUtils.close(writer);
    }
  }
  /** Write as many records as needed to reach provided file size. Records goes from 1 up to N. */
  private void writeRecordsToReachFileSize(int blockSize, long sizeInBytes) throws Exception
  {
      final int numberOfValues = (int) sizeInBytes / INT_RECORD_SIZE;
      final int[] values = new int[numberOfValues];
      for (int i = 0; i < numberOfValues; i++)
      {
        values[i] = i+1;
      }
      writeRecords(blockSize, records(values));
  }
  /** Returns provided number of keys to seek in random order, for a file of provided size. */
  private List<Integer> getShuffledKeys(long fileSizeInBytes, int numberOfKeys)
  {
    final int numberOfValues = (int) fileSizeInBytes / INT_RECORD_SIZE;
    final List<Integer> values = new ArrayList<Integer>(numberOfValues);
    for (int i = 0; i < numberOfValues; i++)
    {
      values.add(i+1);
    }
    Collections.shuffle(values);
    return values.subList(0, numberOfKeys);
  }
  private BlockLogWriter<Integer, Integer> newWriter(int sizeOfBlock) throws ChangelogException
  {
    return BlockLogWriter.newWriterForTests(new LogWriter(TEST_FILE), RECORD_PARSER, sizeOfBlock);
  }
  private BlockLogReader<Integer, Integer> newReader(int blockSize) throws FileNotFoundException
  {
    return BlockLogReader.newReaderForTests(TEST_FILE, new RandomAccessFile(TEST_FILE, "r"),
        RECORD_PARSER, blockSize);
  }
  private BlockLogReader<Integer, Integer> newReaderWithNullFile(int blockSize) throws FileNotFoundException
  {
    return BlockLogReader.newReaderForTests(null, null, RECORD_PARSER, blockSize);
  }
  /** Helper to build a list of records. */
  private List<Record<Integer, Integer>> records(int...keys)
  {
    List<Record<Integer, Integer>> records = new ArrayList<Record<Integer, Integer>>();
    for (int key : keys)
    {
      records.add(Record.from(key, key));
    }
    return records;
  }
  /** Helper to build a record. */
  private Record<Integer, Integer> record(int key)
  {
    return Record.from(key, key);
  }
  /**
   * Record parser implementation for records with keys and values as integers to be used in tests.
   * Using integer allow to know precisely the size of the records (4 bytes for key + 4 bytes for value),
   * which is useful for some tests.
   */
  private static class IntRecordParser implements RecordParser<Integer, Integer>
  {
    public Record<Integer, Integer> decodeRecord(final ByteString data) throws DecodingException
    {
      ByteSequenceReader reader = data.asReader();
      int key = reader.getInt();
      int value = reader.getInt();
      return Record.from(key, value);
    }
    public ByteString encodeRecord(Record<Integer, Integer> record)
    {
      return new ByteStringBuilder().append((int) record.getKey()).append((int) record.getValue()).toByteString();
    }
    @Override
    public Integer decodeKeyFromString(String key) throws ChangelogException
    {
      return Integer.valueOf(key);
    }
    @Override
    public String encodeKeyToString(Integer key)
    {
      return String.valueOf(key);
    }
    /** {@inheritDoc} */
    @Override
    public Integer getMaxKey()
    {
      return Integer.MAX_VALUE;
    }
  }
}
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
New file
@@ -0,0 +1,296 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.file;
import org.forgerock.opendj.ldap.ByteString;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.replication.server.changelog.file.FileReplicaDBTest.*;
import static org.testng.Assert.*;
@SuppressWarnings("javadoc")
public class FileChangeNumberIndexDBTest extends ReplicationTestCase
{
  @DataProvider(name = "messages")
  Object[][] createMessages() throws Exception
  {
    CSN[] csns = generateCSNs(1, 0, 3);
    DN dn1 = DN.valueOf("o=test1");
    return new Object[][] {
      { new ChangeNumberIndexRecord(0L, dn1, csns[1]) },
      { new ChangeNumberIndexRecord(999L, dn1, csns[2]) },
    };
  }
  @Test(dataProvider="messages")
  public void testRecordParser(ChangeNumberIndexRecord msg) throws Exception
  {
    RecordParser<Long, ChangeNumberIndexRecord> parser = FileChangeNumberIndexDB.RECORD_PARSER;
    ByteString data = parser.encodeRecord(Record.from(msg.getChangeNumber(), msg));
    Record<Long, ChangeNumberIndexRecord> record = parser.decodeRecord(data);
    assertThat(record).isNotNull();
    assertThat(record.getKey()).isEqualTo(msg.getChangeNumber());
    assertThat(record.getValue().getBaseDN()).isEqualTo(msg.getBaseDN());
    assertThat(record.getValue().getCSN()).isEqualTo(msg.getCSN());
  }
  @Test()
  public void testAddAndReadRecords() throws Exception
  {
    ReplicationServer replicationServer = null;
    try
    {
      replicationServer = newReplicationServer();
      final ChangelogDB changelogDB = replicationServer.getChangelogDB();
      changelogDB.setPurgeDelay(0);
      final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
      long[] changeNumbers = addThreeRecords(cnIndexDB);
      long cn1 = changeNumbers[0];
      long cn2 = changeNumbers[1];
      long cn3 = changeNumbers[2];
      // Checks
      assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn1);
      assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3);
      assertEquals(cnIndexDB.count(), 3, "Db count");
      assertFalse(cnIndexDB.isEmpty());
      DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1);
      assertCursorReadsInOrder(cursor, cn1, cn2, cn3);
      cursor = cnIndexDB.getCursorFrom(cn2);
      assertCursorReadsInOrder(cursor, cn2, cn3);
      cursor = cnIndexDB.getCursorFrom(cn3);
      assertCursorReadsInOrder(cursor, cn3);
    }
    finally
    {
      remove(replicationServer);
    }
  }
  @Test()
  public void testClear() throws Exception
  {
    ReplicationServer replicationServer = null;
    try
    {
      replicationServer = newReplicationServer();
      final ChangelogDB changelogDB = replicationServer.getChangelogDB();
      changelogDB.setPurgeDelay(0);
      final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
      addThreeRecords(cnIndexDB);
      cnIndexDB.clear();
      assertNull(cnIndexDB.getOldestRecord());
      assertNull(cnIndexDB.getNewestRecord());
      assertEquals(cnIndexDB.count(), 0);
      assertTrue(cnIndexDB.isEmpty());
    }
    finally
    {
      remove(replicationServer);
    }
  }
  /**
   * This test makes basic operations of a ChangeNumberIndexDB:
   * <ol>
   * <li>create the db</li>
   * <li>add records</li>
   * <li>read them with a cursor</li>
   * <li>set a very short trim period</li>
   * <li>wait for the db to be trimmed / here since the changes are not stored
   * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
   * </ol>
   */
  // TODO : this works only if we ensure that there is a rotation of ahead log file
  // at the right place. First two records are 37 and 76 bytes long,
  // so it means : 37 < max file size < 113 to have the last record alone in the ahead log file
  // Re-enable this test when max file size is customizable for log
  @Test(enabled=false)
  public void testPurge() throws Exception
  {
    ReplicationServer replicationServer = null;
    try
    {
      replicationServer = newReplicationServer();
      final ChangelogDB changelogDB = replicationServer.getChangelogDB();
      changelogDB.setPurgeDelay(0); // disable purging
      // Prepare data to be stored in the db
      DN baseDN1 = DN.valueOf("o=test1");
      DN baseDN2 = DN.valueOf("o=test2");
      DN baseDN3 = DN.valueOf("o=test3");
      CSN[] csns = generateCSNs(1, 0, 3);
      // Add records
      final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
      long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
                 addRecord(cnIndexDB, baseDN2, csns[1]);
      long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
      // The ChangeNumber should not get purged
      final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber();
      assertEquals(oldestCN, cn1);
      assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3);
      DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN);
      try
      {
        assertTrue(cursor.next());
        assertEqualTo(cursor.getRecord(), csns[0], baseDN1);
        assertTrue(cursor.next());
        assertEqualTo(cursor.getRecord(), csns[1], baseDN2);
        assertTrue(cursor.next());
        assertEqualTo(cursor.getRecord(), csns[2], baseDN3);
        assertFalse(cursor.next());
      }
      finally
      {
        StaticUtils.close(cursor);
      }
      // Now test that purging removes all changes but the last one
      changelogDB.setPurgeDelay(1);
      int count = 0;
      while (cnIndexDB.count() > 1 && count < 100)
      {
        Thread.sleep(10);
        count++;
      }
      assertOnlyNewestRecordIsLeft(cnIndexDB, 3);
    }
    finally
    {
      remove(replicationServer);
    }
  }
  private long[] addThreeRecords(FileChangeNumberIndexDB cnIndexDB) throws Exception
  {
    // Prepare data to be stored in the db
    DN baseDN1 = DN.valueOf("o=test1");
    DN baseDN2 = DN.valueOf("o=test2");
    DN baseDN3 = DN.valueOf("o=test3");
    CSN[] csns = generateCSNs(1, 0, 3);
    // Add records
    long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
    long cn2 = addRecord(cnIndexDB, baseDN2, csns[1]);
    long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
    return new long[] { cn1, cn2, cn3 };
  }
  private long addRecord(FileChangeNumberIndexDB cnIndexDB, DN baseDN, CSN csn) throws ChangelogException
  {
    return cnIndexDB.addRecord(new ChangeNumberIndexRecord(baseDN, csn));
  }
  private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN)
  {
    assertEquals(record.getCSN(), csn);
    assertEquals(record.getBaseDN(), baseDN);
  }
  private FileChangeNumberIndexDB getCNIndexDB(ReplicationServer rs) throws ChangelogException
  {
    final FileChangelogDB changelogDB = (FileChangelogDB) rs.getChangelogDB();
    final FileChangeNumberIndexDB cnIndexDB =
        (FileChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB();
    assertTrue(cnIndexDB.isEmpty());
    return cnIndexDB;
  }
  /**
   * The newest record is no longer cleared to ensure persistence to the last
   * generated change number across server restarts.
   */
  private void assertOnlyNewestRecordIsLeft(FileChangeNumberIndexDB cnIndexDB,
      int newestChangeNumber) throws ChangelogException
  {
    assertEquals(cnIndexDB.count(), 1);
    assertFalse(cnIndexDB.isEmpty());
    final ChangeNumberIndexRecord oldest = cnIndexDB.getOldestRecord();
    final ChangeNumberIndexRecord newest = cnIndexDB.getNewestRecord();
    assertEquals(oldest.getChangeNumber(), newestChangeNumber);
    assertEquals(oldest.getChangeNumber(), newest.getChangeNumber());
    assertEquals(oldest.getBaseDN(), newest.getBaseDN());
    assertEquals(oldest.getCSN(), newest.getCSN());
  }
  private ReplicationServer newReplicationServer() throws Exception
  {
    TestCaseUtils.startServer();
    final int port = TestCaseUtils.findFreePort();
    final ReplServerFakeConfiguration cfg =
        new ReplServerFakeConfiguration(port, null, ReplicationDBImplementation.LOG, 0, 2, 0, 100, null);
    cfg.setComputeChangeNumber(true);
    return new ReplicationServer(cfg);
  }
  private void assertCursorReadsInOrder(DBCursor<ChangeNumberIndexRecord> cursor,
      long... cns) throws ChangelogException
  {
    try
    {
      for (long cn : cns)
      {
        assertTrue(cursor.next());
        assertEquals(cursor.getRecord().getChangeNumber(), cn);
      }
      assertFalse(cursor.next());
    }
    finally
    {
      cursor.close();
    }
  }
}
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
New file
@@ -0,0 +1,602 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.file;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import org.assertj.core.api.SoftAssertions;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ByteString;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
/**
 * Test the FileReplicaDB class
 */
@SuppressWarnings("javadoc")
public class FileReplicaDBTest extends ReplicationTestCase
{
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private DN TEST_ROOT_DN;
  /**
   * Utility - log debug message - highlight it is from the test and not
   * from the server code. Makes easier to observe the test steps.
   */
  private void debugInfo(String tn, String s)
  {
    logger.trace("** TEST %s ** %s", tn, s);
  }
  @BeforeClass
  public void setup() throws Exception
  {
    TEST_ROOT_DN = DN.valueOf(TEST_ROOT_DN_STRING);
  }
  @DataProvider(name = "messages")
  Object[][] createMessages()
  {
    CSN[] csns = generateCSNs(1, 0, 2);
    return new Object[][] {
      { new DeleteMsg(TEST_ROOT_DN, csns[0], "uid") },
      { new DeleteMsg(TEST_ROOT_DN, csns[1], "uid") },
    };
  }
  @Test(dataProvider="messages")
  public void testRecordParser(UpdateMsg msg) throws Exception
  {
    RecordParser<CSN, UpdateMsg> parser = FileReplicaDB.RECORD_PARSER;
    ByteString data = parser.encodeRecord(Record.from(msg.getCSN(), msg));
    Record<CSN, UpdateMsg> record = parser.decodeRecord(data);
    assertThat(record).isNotNull();
    assertThat(record.getKey()).isEqualTo(msg.getCSN());
    assertThat(record.getValue()).isEqualTo(msg);
  }
  @Test
  public void testDomainDNWithForwardSlashes() throws Exception
  {
    ReplicationServer replicationServer = null;
    FileReplicaDB replicaDB = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100, 5000);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = generateCSNs(1, 0, 1);
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
      waitChangesArePersisted(replicaDB, 1);
      assertFoundInOrder(replicaDB, csns[0]);
    }
    finally
    {
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
  @Test
  public void testAddAndReadRecords() throws Exception
  {
    ReplicationServer replicationServer = null;
    FileReplicaDB replicaDB = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100, 5000);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = generateCSNs(1, 0, 5);
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
      waitChangesArePersisted(replicaDB, 3);
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
      assertLimits(replicaDB, csns[0], csns[2]);
      DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN, csns[3], "uid");
      replicaDB.add(update4);
      waitChangesArePersisted(replicaDB, 4);
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
      assertFoundInOrder(replicaDB, csns[2], csns[3]);
      assertFoundInOrder(replicaDB, csns[3]);
      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
    }
    finally
    {
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
  @Test
  public void testGenerateCursorFrom() throws Exception
  {
    ReplicationServer replicationServer = null;
    FileReplicaDB replicaDB = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100000, 10);
      replicaDB = newReplicaDB(replicationServer);
      final CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5);
      final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns));
      csns2.remove(csns[3]);
      for (CSN csn : csns2)
      {
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
      }
      waitChangesArePersisted(replicaDB, 4);
      for (CSN csn : csns2)
      {
        assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn);
      }
      assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]);
      for (int i = 0; i < csns2.size() - 1; i++)
      {
        assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1));
      }
      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
    }
    finally
    {
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
  @DataProvider
  Object[][] dataForTestsWithCursorReinitialization()
  {
    return new Object[][] {
      // the index to use in CSN array for the start key of the cursor
      { 0 },
      { 1 },
      { 4 },
    };
  }
  @Test(dataProvider="dataForTestsWithCursorReinitialization")
  public void testGenerateCursorFromWithCursorReinitialization(int csnIndexForStartKey) throws Exception
  {
    ReplicationServer replicationServer = null;
    DBCursor<UpdateMsg> cursor = null;
    FileReplicaDB replicaDB = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100000, 10);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5);
      cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey],
          GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
      assertFalse(cursor.next());
      int[] indicesToAdd = new int[] { 0, 1, 2, 4 };
      for (int i : indicesToAdd)
      {
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
      }
      waitChangesArePersisted(replicaDB, 4);
      for (int i = csnIndexForStartKey+1; i < 5; i++)
      {
        if (i != 3)
        {
          assertTrue(cursor.next());
          assertEquals(cursor.getRecord().getCSN(), csns[i], "index i=" + i);
        }
      }
      assertFalse(cursor.next());
    }
    finally
    {
      close(cursor);
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
  // TODO : this works only if we ensure that there is a rotation of ahead log file
  // at right place. Each record takes 54 bytes, so it means : 108 < max file size < 162 to have
  // the last record alone in the ahead log file
  // Re-enable this test when max file size is customizable for log
  @Test(enabled=false)
  public void testPurge() throws Exception
  {
    ReplicationServer replicationServer = null;
    FileReplicaDB replicaDB = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100, 5000);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = generateCSNs(1, 0, 5);
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[3], "uid"));
      waitChangesArePersisted(replicaDB, 4);
      replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
      int count = 0;
      boolean purgeSucceeded = false;
      final CSN expectedNewestCSN = csns[3];
      do
      {
        Thread.sleep(10);
        final CSN oldestCSN = replicaDB.getOldestCSN();
        final CSN newestCSN = replicaDB.getNewestCSN();
        purgeSucceeded =
            oldestCSN.equals(expectedNewestCSN)
                && newestCSN.equals(expectedNewestCSN);
        count++;
      }
      while (!purgeSucceeded && count < 100);
      assertTrue(purgeSucceeded);
    }
    finally
    {
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
  /**
   * Test the feature of clearing a FileReplicaDB used by a replication server.
   * The clear feature is used when a replication server receives a request to
   * reset the generationId of a given domain.
   */
  @Test
  public void testClear() throws Exception
  {
    ReplicationServer replicationServer = null;
    FileReplicaDB replicaDB = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100, 5000);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = generateCSNs(1, 0, 3);
      // Add the changes and check they are here
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
      assertLimits(replicaDB, csns[0], csns[2]);
      // Clear DB and check it is cleared.
      replicaDB.clear();
      assertLimits(replicaDB, null, null);
    }
    finally
    {
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
  private void assertNextCSN(FileReplicaDB replicaDB, final CSN startCSN,
      final PositionStrategy positionStrategy, final CSN expectedCSN)
      throws ChangelogException
  {
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
    try
    {
      final SoftAssertions softly = new SoftAssertions();
      softly.assertThat(cursor.next()).isTrue();
      softly.assertThat(cursor.getRecord().getCSN()).isEqualTo(expectedCSN);
      softly.assertAll();
    }
    finally
    {
      close(cursor);
    }
  }
  private void assertNotFound(FileReplicaDB replicaDB, final CSN startCSN,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
    try
    {
      final SoftAssertions softly = new SoftAssertions();
      softly.assertThat(cursor.next()).isFalse();
      softly.assertThat(cursor.getRecord()).isNull();
      softly.assertAll();
    }
    finally
    {
      close(cursor);
    }
  }
  /**
   * Test the logic that manages counter records in the FileReplicaDB in order to
   * optimize the oldest and newest records in the replication changelog db.
   */
  @Test(groups = { "opendj-256" })
  public void testGetOldestNewestCSNs() throws Exception
  {
    // It's worth testing with 2 different setting for counterRecord
    // - a counter record is put every 10 Update msg in the db - just a unit
    //   setting.
    // - a counter record is put every 1000 Update msg in the db - something
    //   closer to real setting.
    // In both cases, we want to test the counting algorithm,
    // - when start and stop are before the first counter record,
    // - when start and stop are before and after the first counter record,
    // - when start and stop are after the first counter record,
    // - when start and stop are before and after more than one counter record,
    // After a purge.
    // After shutting down/closing and reopening the db.
    // TODO : do we need the management of counter records ?
    // Use unreachable limits for now because it is not implemented
    testGetOldestNewestCSNs(40, 100);
    testGetOldestNewestCSNs(4000, 10000);
  }
  private void testGetOldestNewestCSNs(final int max, final int counterWindow) throws Exception
  {
    String tn = "testDBCount("+max+","+counterWindow+")";
    debugInfo(tn, "Starting test");
    File testRoot = null;
    ReplicationServer replicationServer = null;
    ReplicationEnvironment dbEnv = null;
    FileReplicaDB replicaDB = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100000, 10);
      testRoot = createCleanDir();
      dbEnv = new ReplicationEnvironment(testRoot.getPath(), replicationServer);
      replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
      // Populate the db with 'max' msg
      int mySeqnum = 1;
      CSN csns[] = new CSN[2 * (max + 1)];
      long now = System.currentTimeMillis();
      for (int i=1; i<=max; i++)
      {
        csns[i] = new CSN(now + i, mySeqnum, 1);
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        mySeqnum+=2;
      }
      waitChangesArePersisted(replicaDB, max, counterWindow);
      assertLimits(replicaDB, csns[1], csns[max]);
      // Now we want to test that after closing and reopening the db, the
      // counting algo is well reinitialized and when new messages are added
      // the new counter are correctly generated.
      debugInfo(tn, "SHUTDOWN replicaDB and recreate");
      replicaDB.shutdown();
      replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
      assertLimits(replicaDB, csns[1], csns[max]);
      // Populate the db with 'max' msg
      for (int i=max+1; i<=2 * max; i++)
      {
        csns[i] = new CSN(now + i, mySeqnum, 1);
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        mySeqnum+=2;
      }
      waitChangesArePersisted(replicaDB, 2 * max, counterWindow);
      assertLimits(replicaDB, csns[1], csns[2 * max]);
      replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
      String testcase = "AFTER PURGE (oldest, newest)=";
      debugInfo(tn, testcase + replicaDB.getOldestCSN() + replicaDB.getNewestCSN());
      assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Newest=");
      // Clear ...
      debugInfo(tn,"clear:");
      replicaDB.clear();
      // Check the db is cleared.
      assertEquals(null, replicaDB.getOldestCSN());
      assertEquals(null, replicaDB.getNewestCSN());
      debugInfo(tn,"Success");
    }
    finally
    {
      shutdown(replicaDB);
      if (dbEnv != null)
      {
        dbEnv.shutdown();
      }
      remove(replicationServer);
      TestCaseUtils.deleteDirectory(testRoot);
    }
  }
  private void assertLimits(FileReplicaDB replicaDB, CSN oldestCSN, CSN newestCSN)
  {
    final SoftAssertions softly = new SoftAssertions();
    softly.assertThat(replicaDB.getOldestCSN()).as("Wrong oldest CSN").isEqualTo(oldestCSN);
    softly.assertThat(replicaDB.getNewestCSN()).as("Wrong newest CSN").isEqualTo(newestCSN);
    softly.assertAll();
  }
  private void shutdown(FileReplicaDB replicaDB)
  {
    if (replicaDB != null)
    {
      replicaDB.shutdown();
    }
  }
  static CSN[] generateCSNs(int serverId, long timestamp, int number)
  {
    CSNGenerator gen = new CSNGenerator(serverId, timestamp);
    CSN[] csns = new CSN[number];
    for (int i = 0; i < csns.length; i++)
    {
      csns[i] = gen.newCSN();
    }
    return csns;
  }
  private void waitChangesArePersisted(FileReplicaDB replicaDB,
      int nbRecordsInserted) throws Exception
  {
    waitChangesArePersisted(replicaDB, nbRecordsInserted, 1000);
  }
  private void waitChangesArePersisted(FileReplicaDB replicaDB,
      int nbRecordsInserted, int counterWindow) throws Exception
  {
    // one counter record is inserted every time "counterWindow"
    // records have been inserted
    int expectedNbRecords = nbRecordsInserted + (nbRecordsInserted - 1) / counterWindow;
    int count = 0;
    while (replicaDB.getNumberRecords() != expectedNbRecords && count < 100)
    {
      Thread.sleep(10);
      count++;
    }
    assertEquals(replicaDB.getNumberRecords(), expectedNbRecords);
  }
  private ReplicationServer configureReplicationServer(int windowSize, int queueSize)
      throws IOException, ConfigException
  {
    final int changelogPort = findFreePort();
    final ReplicationServerCfg conf = new ReplServerFakeConfiguration(
        changelogPort, null, ReplicationDBImplementation.LOG, 0, 2, queueSize, windowSize, null);
    return new ReplicationServer(conf);
  }
  private FileReplicaDB newReplicaDB(ReplicationServer rs) throws Exception
  {
    final FileChangelogDB changelogDB = (FileChangelogDB) rs.getChangelogDB();
    return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, rs).getFirst();
  }
  private File createCleanDir() throws IOException
  {
    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
    String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot
            + File.separator + "build");
    path = path + File.separator + "unit-tests" + File.separator + "FileReplicaDB";
    final File testRoot = new File(path);
    TestCaseUtils.deleteDirectory(testRoot);
    testRoot.mkdirs();
    return testRoot;
  }
  private void assertFoundInOrder(FileReplicaDB replicaDB, CSN... csns) throws Exception
  {
    if (csns.length == 0)
    {
      return;
    }
    assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns);
    assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns);
  }
  private void assertFoundInOrder(FileReplicaDB replicaDB,
      final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
  {
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
    try
    {
      assertNull(cursor.getRecord(), "Cursor should point to a null record initially");
      for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++)
      {
        final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
        final SoftAssertions softly = new SoftAssertions();
        softly.assertThat(cursor.next()).as(msg).isTrue();
        softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]);
        softly.assertAll();
      }
      final SoftAssertions softly = new SoftAssertions();
      softly.assertThat(cursor.next()).isFalse();
      softly.assertThat(cursor.getRecord()).isNull();
      softly.assertAll();
    }
    finally
    {
      close(cursor);
    }
  }
}
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
New file
@@ -0,0 +1,441 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import java.io.File;
import java.io.RandomAccessFile;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.opendj.ldap.ByteSequenceReader;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ByteStringBuilder;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
@SuppressWarnings("javadoc")
@Test(sequential=true)
public class LogFileTest extends DirectoryServerTestCase
{
  private static final File TEST_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
  private static final File TEST_LOG_FILE = new File(TEST_DIRECTORY, Log.HEAD_LOG_FILE_NAME);
  static final StringRecordParser RECORD_PARSER = new StringRecordParser();
  @BeforeClass
  public void createTestDirectory()
  {
    TEST_DIRECTORY.mkdirs();
  }
  @BeforeMethod
  /**
   * Create a new log file with ten records starting from (key01, value1) until (key10, value10).
   * So log contains keys "key01", "key02", ..., "key10"
   */
  public void initialize() throws Exception
  {
    if (TEST_LOG_FILE.exists())
    {
      TEST_LOG_FILE.delete();
    }
    LogFile<String, String> logFile =  null;
    try
    {
      logFile = getLogFile(RECORD_PARSER);
      for (int i = 1; i <= 10; i++)
      {
        logFile.append(Record.from(String.format("key%02d", i), "value"+i));
      }
    }
    finally
    {
      StaticUtils.close(logFile);
    }
  }
  @AfterClass
  public void cleanTestChangelogDirectory()
  {
    StaticUtils.recursiveDelete(TEST_DIRECTORY);
  }
  private LogFile<String, String> getLogFile(RecordParser<String, String> parser) throws ChangelogException
  {
    return LogFile.newAppendableLogFile(TEST_LOG_FILE, parser);
  }
  @Test
  public void testCursor() throws Exception
  {
    LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = changelog.getCursor();
      assertThatCursorCanBeFullyRead(cursor, 1, 10);
    }
    finally {
      StaticUtils.close(cursor, changelog);
    }
  }
  @DataProvider
  Object[][] cursorPositionTo()
  {
    return new Object[][] {
      // key to position to, key matching strategy, position strategy, position is found ?,
      //    expected start index of cursor (use -1 if cursor should be exhausted), expected end index of cursor
      // equal
      { "key00", EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1},
      { "key02", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 2, 10},
      { "key05", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10},
      { "key050", EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1},
      { "key07", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 7, 10},
      { "key10", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10},
      { "key11", EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1},
      { "key00", EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1},
      { "key02", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true,  3, 10},
      { "key05", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true,  6, 10},
      { "key050", EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1},
      { "key07", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 8, 10},
      { "key10", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1},
      { "key11", EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1},
      // less than or equal
      // key00 is a special case : position is not found but cursor is positioned on beginning
      // so it is possible to iterate on it from start to end
      { "key00", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, false, 1, 10},
      { "key02", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 2, 10},
      { "key05", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10},
      { "key050", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10},
      { "key07", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 7, 10},
      { "key10", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10},
      { "key11", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10},
      // key00 is a special case : position is not found but cursor is positioned on beginning
      // so it is possible to iterate on it from 2 to end
      { "key00", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, 2, 10},
      { "key02", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 3, 10},
      { "key05", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10},
      { "key050", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10},
      { "key07", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 8, 10},
      { "key10", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1},
      { "key11", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1},
      // greater than or equal
      { "key00", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 1, 10},
      { "key02", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 2, 10},
      { "key05", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10},
      { "key050", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 6, 10},
      { "key07", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 7, 10},
      { "key10", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10},
      { "key11", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1},
      { "key00", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 1, 10},
      { "key02", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 3, 10},
      { "key05", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10},
      { "key050", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10},
      { "key07", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 8, 10},
      { "key10", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1},
      { "key11", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1},
    };
  }
  /**
   * Test cursor positioning for a given key, matching strategy and position strategy.
   * Cursor is fully read from the expected starting index to the expected end index, unless it is expected
   * to be directly exhausted.
   */
  @Test(dataProvider="cursorPositionTo")
  public void testCursorPositionTo(String key, KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy,
      boolean positionShouldBeFound, int cursorShouldStartAt, int cursorShouldEndAt) throws Exception
  {
    LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
    LogFileCursor<String, String> cursor = null;
    try {
      cursor = changelog.getCursor();
      boolean success = cursor.positionTo(key, matchingStrategy, positionStrategy);
      assertThat(success).isEqualTo(positionShouldBeFound);
      if (cursorShouldStartAt >= 0)
      {
        assertThatCursorCanBeFullyRead(cursor, cursorShouldStartAt, cursorShouldEndAt);
      }
      else
      {
        assertThatCursorIsExhausted(cursor);
      }
    }
    finally {
      StaticUtils.close(cursor, changelog);
    }
  }
  @Test
  public void testGetOldestRecord() throws Exception
  {
    LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
    try
    {
      Record<String, String> record = changelog.getOldestRecord();
      assertThat(record).isEqualTo(Record.from("key01", "value1"));
    }
    finally {
      StaticUtils.close(changelog);
    }
  }
  @Test
  public void testGetNewestRecord() throws Exception
  {
    LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
    try
    {
      Record<String, String> record = changelog.getNewestRecord();
      assertThat(record).isEqualTo(Record.from("key10", "value10"));
    }
    finally {
      StaticUtils.close(changelog);
    }
  }
  @DataProvider(name = "corruptedRecordData")
  Object[][] corruptedRecordData()
  {
    return new Object[][]
    {
      // write partial record size (should be 4 bytes)
      { 1, new ByteStringBuilder().append((byte) 0) },
      // write partial record size (should be 4 bytes)
      { 2, new ByteStringBuilder().append((byte) 0).append((byte) 0).append((byte) 0) },
      // write size only
      { 3, new ByteStringBuilder().append(10) },
      // write size + key
      { 4, new ByteStringBuilder().append(100).append("key") },
      // write size + key + separator
      { 5, new ByteStringBuilder().append(100).append("key").append(StringRecordParser.STRING_SEPARATOR) },
      // write size + key + separator + partial value
      { 6, new ByteStringBuilder().append(100).append("key").append(StringRecordParser.STRING_SEPARATOR).append("v") },
    };
  }
  @Test(dataProvider="corruptedRecordData")
  public void testRecoveryOnCorruptedLogFile(
      @SuppressWarnings("unused") int unusedId,
      ByteStringBuilder corruptedRecordData) throws Exception
  {
    LogFile<String, String> logFile = null;
    DBCursor<Record<String, String>> cursor = null;
    try
    {
      corruptTestLogFile(corruptedRecordData);
      // open the log file: the file should be repaired at this point
      logFile = getLogFile(RECORD_PARSER);
      // write a new valid record
      logFile.append(Record.from(String.format("key%02d", 11), "value"+ 11));
      // ensure log can be fully read including the new record
      cursor = logFile.getCursor();
      assertThatCursorCanBeFullyRead(cursor, 1, 11);
    }
    finally
    {
      StaticUtils.close(logFile);
    }
  }
  /**
   * Append some raw data to the TEST_LOG_FILE. Intended to corrupt the log
   * file.
   */
  private void corruptTestLogFile(ByteStringBuilder corruptedRecordData) throws Exception
  {
    RandomAccessFile output = null;
    try {
      output = new RandomAccessFile(TEST_LOG_FILE, "rwd");
      output.seek(output.length());
      output.write(corruptedRecordData.toByteArray());
    }
    finally
    {
      StaticUtils.close(output);
    }
  }
  @Test
  /**
   * Test that changes are visible immediately to a reader after a write.
   */
  public void testWriteAndReadOnSameLogFile() throws Exception
  {
    LogFile<String, String> writeLog = null;
    LogFile<String, String> readLog = null;
    try
    {
      writeLog = getLogFile(RECORD_PARSER);
      readLog = getLogFile(RECORD_PARSER);
      for (int i = 1; i <= 100; i++)
      {
        Record<String, String> record = Record.from("newkey" + i, "newvalue" + i);
        writeLog.append(record);
        assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
        assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key01", "value1"));
        assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record);
        assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key01", "value1"));
      }
    }
    finally
    {
      StaticUtils.close(writeLog, readLog);
    }
  }
  /**
   * Read the cursor until exhaustion, ensuring that its first value is fromIndex and its last value
   * endIndex, using (keyN, valueN) where N is the index.
   */
  private void assertThatCursorCanBeFullyRead(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
      throws Exception
  {
    assertThat(cursor.getRecord()).isNull();
    for (int i = fromIndex; i <= endIndex; i++)
    {
      assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
      assertThat(cursor.getRecord()).isEqualTo(Record.from(String.format("key%02d", i), "value" + i));
    }
    assertThatCursorIsExhausted(cursor);
  }
  private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception
  {
    assertThat(cursor.next()).isFalse();
    assertThat(cursor.getRecord()).isNull();
  }
  /**
   * Record parser implementation for records with keys as String and values as
   * String, to be used in tests.
   */
  private static class StringRecordParser implements RecordParser<String, String>
  {
    private static final byte STRING_SEPARATOR = 0;
    public Record<String, String> decodeRecord(final ByteString data) throws DecodingException
    {
      ByteSequenceReader reader = data.asReader();
      String key = reader.getString(getNextStringLength(reader));
      reader.skip(1);
      String value = reader.getString(getNextStringLength(reader));
      return key.isEmpty() || value.isEmpty() ? null : Record.from(key, value);
    }
    /** Returns the length of next string by looking for the zero byte used as separator. */
    private int getNextStringLength(ByteSequenceReader reader)
    {
      int length = 0;
      while (reader.peek(length) != STRING_SEPARATOR)
      {
        length++;
      }
      return length;
    }
    public ByteString encodeRecord(Record<String, String> record)
    {
      return new ByteStringBuilder()
        .append(record.getKey()).append(STRING_SEPARATOR)
        .append(record.getValue()).append(STRING_SEPARATOR).toByteString();
    }
    @Override
    public String decodeKeyFromString(String key) throws ChangelogException
    {
      return key;
    }
    @Override
    public String encodeKeyToString(String key)
    {
      return key;
    }
    /** {@inheritDoc} */
    @Override
    public String getMaxKey()
    {
      // '~' character has the highest ASCII value
      return "~~~~";
    }
  }
  /** A parser that can be set to fail when reading. */
  static class FailingStringRecordParser extends StringRecordParser
  {
    private boolean failToRead = false;
    @Override
    public Record<String, String> decodeRecord(ByteString data) throws DecodingException
    {
      if (failToRead)
      {
        throw new DecodingException(LocalizableMessage.raw("Error when parsing record"));
      }
      return super.decodeRecord(data);
    }
    void setFailToRead(boolean shouldFail)
    {
      failToRead = shouldFail;
    }
  }
}
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
New file
@@ -0,0 +1,598 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.replication.server.changelog.file.LogFileTest.*;
import java.io.File;
import java.util.concurrent.atomic.AtomicReference;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.file.LogFileTest.FailingStringRecordParser;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@SuppressWarnings("javadoc")
@Test(sequential=true)
public class LogTest extends DirectoryServerTestCase
{
  // Use a directory dedicated to this test class
  private static final File LOG_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
  @BeforeMethod
  public void initialize() throws Exception
  {
    // Delete any previous log
    if (LOG_DIRECTORY.exists())
    {
      StaticUtils.recursiveDelete(LOG_DIRECTORY);
    }
    // Build a log with 10 records with String keys and String values
    // Keys are using the format keyNNN where N is a figure
    // You should always ensure keys are correctly ordered otherwise tests may break unexpectedly
    Log<String, String> log = openLog(RECORD_PARSER);
    for (int i = 1; i <= 10; i++)
    {
      log.append(Record.from(String.format("key%03d", i), "value" + i));
    }
    log.close();
  }
  private Log<String, String> openLog(RecordParser<String, String> parser) throws ChangelogException
  {
    // Each string record has a length of approximately 18 bytes
    // This size is set in order to have 2 records per log file before the rotation happens
    // This allow to ensure rotation mechanism is thoroughly tested
    // Some tests rely on having 2 records per log file (especially the purge tests), so take care
    // if this value has to be changed
    int sizeLimitPerFileInBytes = 30;
    return Log.openLog(LOG_DIRECTORY, parser, sizeLimitPerFileInBytes);
  }
  @Test
  public void testCursor() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = log.getCursor();
      assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @Test
  public void testCursorWhenGivenAnExistingKey() throws Exception
  {
    Log<String, String> log = openLog(RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = log.getCursor("key005");
      assertThatCursorCanBeFullyReadFromStart(cursor, 5, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @Test
  public void testCursorWhenGivenAnUnexistingKey() throws Exception
  {
    Log<String, String> log = openLog(RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      // key is between key005 and key006
      cursor = log.getCursor("key005000");
      assertThat(cursor).isNotNull();
      assertThat(cursor.getRecord()).isNull();
      assertThat(cursor.next()).isFalse();
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @Test
  public void testCursorWhenGivenANullKey() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = log.getCursor(null);
      assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @DataProvider
  Object[][] cursorData()
  {
    return new Object[][] {
      // 3 first values are input data : key to position to, key matching strategy, position strategy,
      // 2 last values are expected output :
      //    first index of cursor (-1 if cursor should be exhausted), last index of cursor
      { "key000", EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
      { "key001", EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 },
      { "key004", EQUAL_TO_KEY, ON_MATCHING_KEY, 4, 10 },
      { "key0050", EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
      { "key009", EQUAL_TO_KEY, ON_MATCHING_KEY, 9, 10 },
      { "key010", EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 },
      { "key011", EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
      { "key000", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      { "key001", EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 },
      { "key004", EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 },
      { "key0050", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      { "key009", EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 },
      { "key010", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      { "key011", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      { "key000", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
      { "key001", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 },
      { "key004", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 4, 10 },
      { "key005", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 5, 10 },
      { "key0050", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 5, 10 },
      { "key006", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 6, 10 },
      { "key009", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 9, 10 },
      { "key010", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 },
      { "key011", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 },
      { "key000", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      { "key001", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 },
      { "key004", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 },
      { "key005", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 },
      { "key0050", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 },
      { "key006", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 7, 10 },
      { "key009", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 },
      { "key010", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      { "key011", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      { "key000", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 },
      { "key001", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 },
      { "key004", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 4, 10 },
      { "key0050", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 6, 10 },
      { "key009", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 9, 10 },
      { "key010", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 },
      { "key011", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
      { "key000", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 10 },
      { "key001", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 },
      { "key004", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 },
      { "key0050", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 },
      { "key009", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 },
      { "key010", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      { "key011", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
    };
  }
  @Test(dataProvider="cursorData")
  public void testCursorWithStrategies(String key, KeyMatchingStrategy matchingStrategy,
      PositionStrategy positionStrategy, int cursorShouldStartAt, int cursorShouldEndAt) throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = log.getCursor(key, matchingStrategy, positionStrategy);
      if (cursorShouldStartAt != -1)
      {
        assertThatCursorCanBeFullyReadFromStart(cursor, cursorShouldStartAt, cursorShouldEndAt);
      }
      else
      {
        assertThatCursorIsExhausted(cursor);
      }
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @Test
  public void testCursorMatchingAnyPositioningAnyWhenGivenANullKey() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = log.getCursor(null, null, null);
      assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @Test(expectedExceptions=ChangelogException.class)
  public void testCursorWhenParserFailsToRead() throws Exception
  {
    FailingStringRecordParser parser = new FailingStringRecordParser();
    Log<String, String> log = openLog(parser);
    parser.setFailToRead(true);
    try {
      log.getCursor("key");
    }
    finally {
      StaticUtils.close(log);
    }
  }
  @Test
  public void testGetOldestRecord() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    try
    {
      Record<String, String> record = log.getOldestRecord();
      assertThat(record).isEqualTo(Record.from("key001", "value1"));
    }
    finally {
      StaticUtils.close(log);
    }
  }
  @Test
  public void testGetNewestRecord() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    try
    {
      Record<String, String> record = log.getNewestRecord();
      assertThat(record).isEqualTo(Record.from("key010", "value10"));
    }
    finally {
      StaticUtils.close(log);
    }
  }
  /**
   * Test that changes are visible immediately to a reader after a write.
   */
  @Test
  public void testWriteAndReadOnSameLog() throws Exception
  {
    Log<String, String> writeLog = null;
    Log<String, String> readLog = null;
    try
    {
      writeLog = openLog(LogFileTest.RECORD_PARSER);
      readLog = openLog(LogFileTest.RECORD_PARSER);
      for (int i = 1; i <= 10; i++)
      {
        Record<String, String> record = Record.from(String.format("nkey%03d", i), "nvalue" + i);
        writeLog.append(record);
        assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
        assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key001", "value1"));
        assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record);
        assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key001", "value1"));
      }
    }
    finally
    {
      StaticUtils.close(writeLog, readLog);
    }
  }
  @Test
  public void testTwoConcurrentWrite() throws Exception
  {
    Log<String, String> writeLog1 = null;
    Log<String, String> writeLog2 = null;
    DBCursor<Record<String, String>> cursor = null;
    try
    {
      writeLog1 = openLog(LogFileTest.RECORD_PARSER);
      writeLog2 = openLog(LogFileTest.RECORD_PARSER);
      writeLog1.append(Record.from("key020", "starting record"));
      AtomicReference<ChangelogException> exceptionRef = new AtomicReference<ChangelogException>();
      Thread write1 = getWriteLogThread(writeLog1, "a", exceptionRef);
      Thread write2 = getWriteLogThread(writeLog2, "b", exceptionRef);
      write1.run();
      write2.run();
      write1.join();
      write2.join();
      if (exceptionRef.get() != null)
      {
        throw exceptionRef.get();
      }
      writeLog1.syncToFileSystem();
      cursor = writeLog1.getCursor("key020");
      for (int i = 1; i <= 61; i++)
      {
         assertThat(cursor.next()).isTrue();
      }
      assertThat(cursor.getRecord()).isIn(Record.from("nkb030", "vb30"), Record.from("nka030", "va30"));
    }
    finally
    {
      StaticUtils.close(cursor, writeLog1, writeLog2);
    }
  }
  /**
   *  This test should be disabled.
   *  Enable it locally when you need to have an rough idea of write performance.
   */
  @Test(enabled=false)
  public void logWriteSpeed() throws Exception
  {
    Log<String, String> writeLog = null;
    try
    {
      long sizeOf1MB = 1024*1024;
      writeLog = Log.openLog(LOG_DIRECTORY, LogFileTest.RECORD_PARSER, sizeOf1MB);
      for (int i = 1; i < 1000000; i++)
      {
        writeLog.append(Record.from(String.format("key%010d", i), "value" + i));
      }
    }
    finally
    {
      StaticUtils.close(writeLog);
    }
  }
  @Test
  public void testWriteWhenCursorIsOpenedAndAheadLogFileIsRotated() throws Exception
  {
    DBCursor<Record<String, String>> cursor = null;
    Log<String, String> log = null;
    try
    {
      log = openLog(LogFileTest.RECORD_PARSER);
      cursor = log.getCursor();
      // advance cursor to last record to ensure it is pointing to ahead log file
      advanceCursorUpTo(cursor, 1, 10);
      // add new records to ensure the ahead log file is rotated
      for (int i = 11; i <= 20; i++)
      {
        log.append(Record.from(String.format("key%03d", i), "value" + i));
      }
      // check that cursor can fully read the new records
      assertThatCursorCanBeFullyRead(cursor, 11, 20);
    }
    finally
    {
      StaticUtils.close(cursor, log);
    }
  }
  @Test
  public void testWriteWhenMultiplesCursorsAreOpenedAndAheadLogFileIsRotated() throws Exception
  {
    DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null, cursor4 = null;
    Log<String, String> log = null;
    try
    {
      log = openLog(LogFileTest.RECORD_PARSER);
      cursor1 = log.getCursor();
      advanceCursorUpTo(cursor1, 1, 1);
      cursor2 = log.getCursor();
      advanceCursorUpTo(cursor2, 1, 4);
      cursor3 = log.getCursor();
      advanceCursorUpTo(cursor3, 1, 9);
      cursor4 = log.getCursor();
      advanceCursorUpTo(cursor4, 1, 10);
      // add new records to ensure the ahead log file is rotated
      for (int i = 11; i <= 20; i++)
      {
        log.append(Record.from(String.format("key%03d", i), "value" + i));
      }
      // check that cursors can fully read the new records
      assertThatCursorCanBeFullyRead(cursor1, 2, 20);
      assertThatCursorCanBeFullyRead(cursor2, 5, 20);
      assertThatCursorCanBeFullyRead(cursor3, 10, 20);
      assertThatCursorCanBeFullyRead(cursor4, 11, 20);
    }
    finally
    {
      StaticUtils.close(cursor1, cursor2, cursor3, cursor4, log);
    }
  }
  @Test
  public void testClear() throws Exception
  {
    DBCursor<Record<String, String>> cursor = null;
    Log<String, String> log = null;
    try
    {
      log = openLog(LogFileTest.RECORD_PARSER);
      log.clear();
      cursor = log.getCursor();
      assertThatCursorIsExhausted(cursor);
    }
    finally
    {
      StaticUtils.close(cursor, log);
    }
  }
  // TODO : Should be re-enabled once the issue with robot functional test replication/totalupdate.txt is solved
  @Test(enabled=false, expectedExceptions=ChangelogException.class)
  public void testClearWhenCursorIsOpened() throws Exception
  {
    DBCursor<Record<String, String>> cursor = null;
    Log<String, String> log = null;
    try
    {
      log = openLog(LogFileTest.RECORD_PARSER);
      cursor = log.getCursor();
      log.clear();
    }
    finally
    {
      StaticUtils.close(cursor, log);
    }
  }
  @DataProvider(name = "purgeKeys")
  Object[][] purgeKeys()
  {
    // purge key, first record expected in the cursor, startIndex + endIndex to fully read the cursor
    return new Object[][]
    {
      // lowest key of the read-only log file "key005_key006.log"
      { "key005", Record.from("key005", "value5"), 6, 10 },
      // key that is not the lowest of the read-only log file "key005_key006.log"
      { "key006", Record.from("key005", "value5"), 6, 10 },
      // lowest key of the ahead log file "ahead.log"
      { "key009", Record.from("key009", "value9"), 10, 10 },
      // key that is not the lowest of the ahead log file "ahead.log"
      { "key010", Record.from("key009", "value9"), 10, 10 },
      // key not present in log, which is between key005 and key006
      { "key005a", Record.from("key005", "value5"), 6, 10 },
      // key not present in log, which is between key006 and key007
      { "key006a", Record.from("key007", "value7"), 8, 10 },
      // key not present in log, which is lower than oldest key key001
      { "key000", Record.from("key001", "value1"), 2, 10 },
      // key not present in log, which is higher than newest key key010
      // should return the lowest key present in ahead log
      { "key011", Record.from("key009", "value9"), 10, 10 },
    };
  }
  /**
   * Given a purge key, after purge is done, expects a new cursor to point on first record provided and
   * then to be fully read starting at provided start index and finishing at provided end index.
   */
  @Test(dataProvider="purgeKeys")
  public void testPurge(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge,
      int cursorStartIndex, int cursorEndIndex) throws Exception
  {
    Log<String, String> log = null;
    DBCursor<Record<String, String>> cursor = null;
    try
    {
      log = openLog(LogFileTest.RECORD_PARSER);
      log.purgeUpTo(purgeKey);
      cursor = log.getCursor();
      assertThat(cursor.next()).isTrue();
      assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge);
      assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex);
    }
    finally
    {
      StaticUtils.close(cursor, log);
    }
  }
  private void advanceCursorUpTo(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
      throws Exception
  {
    for (int i = fromIndex; i <= endIndex; i++)
    {
      assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
      assertThat(cursor.getRecord()).isEqualTo(Record.from(String.format("key%03d", i), "value" + i));
    }
  }
  /**
   * Read the cursor until exhaustion, ensuring that its first value is fromIndex and its last value
   * endIndex, using (keyN, valueN) where N is the index.
   */
  private void assertThatCursorCanBeFullyRead(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
      throws Exception
  {
    advanceCursorUpTo(cursor, fromIndex, endIndex);
    assertThatCursorIsExhausted(cursor);
  }
  /**
   * Read the cursor until exhaustion, beginning at start of cursor.
   */
  private void assertThatCursorCanBeFullyReadFromStart(DBCursor<Record<String, String>> cursor, int fromIndex,
      int endIndex) throws Exception
  {
    assertThat(cursor.getRecord()).isNull();
    assertThatCursorCanBeFullyRead(cursor, fromIndex, endIndex);
  }
  private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception
  {
    assertThat(cursor.next()).isFalse();
    assertThat(cursor.getRecord()).isNull();
  }
  /** Returns a thread that write N records to the provided log. */
  private Thread getWriteLogThread(final Log<String, String> writeLog, final String recordPrefix,
      final AtomicReference<ChangelogException> exceptionRef)
  {
    return new Thread() {
      public void run()
      {
        for (int i = 1; i <= 30; i++)
        {
          Record<String, String> record = Record.from(
              String.format("nk%s%03d", recordPrefix, i), "v" + recordPrefix + i);
          try
          {
            writeLog.append(record);
          }
          catch (ChangelogException e)
          {
            // keep the first exception only
            exceptionRef.compareAndSet(null, e);
          }
        }
      }
    };
  }
}
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
New file
@@ -0,0 +1,327 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.assertj.core.data.MapEntry;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*;
@SuppressWarnings("javadoc")
public class ReplicationEnvironmentTest extends DirectoryServerTestCase
{
  private static final int SERVER_ID_1 = 1;
  private static final int SERVER_ID_2 = 2;
  private static final String DN1_AS_STRING = "cn=test1,dc=company.com";
  private static final String DN2_AS_STRING = "cn=te::st2,dc=company.com";
  private static final String DN3_AS_STRING = "cn=test3,dc=company.com";
  private static final String TEST_DIRECTORY_CHANGELOG = "test-output/changelog";
  @BeforeClass
  public void setUp() throws Exception
  {
    // This test suite depends on having the schema available for DN decoding.
    TestCaseUtils.startFakeServer();
  }
  @AfterClass
  public void tearDown() throws Exception
  {
    TestCaseUtils.shutdownFakeServer();
  }
  @AfterMethod
  public void cleanTestChangelogDirectory()
  {
    final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
    if (rootPath.exists())
    {
      StaticUtils.recursiveDelete(rootPath);
    }
  }
  @Test
  public void testReadChangelogStateWithSingleDN() throws Exception
  {
    Log<Long,ChangeNumberIndexRecord> cnDB = null;
    Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null;
    try
    {
      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      final DN domainDN = DN.valueOf(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
      final ChangelogState state = environment.readOnDiskChangelogState();
      assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
      assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1, SERVER_ID_2);
      assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
      assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
    }
    finally
    {
      StaticUtils.close(cnDB, replicaDB, replicaDB2);
    }
  }
  @Test
  public void testReadChangelogStateWithMultipleDN() throws Exception
  {
    Log<Long,ChangeNumberIndexRecord> cnDB = null;
    List<Log<CSN,UpdateMsg>> replicaDBs = new ArrayList<Log<CSN,UpdateMsg>>();
    try
    {
      File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      List<DN> domainDNs = Arrays.asList(DN.valueOf(DN1_AS_STRING), DN.valueOf(DN2_AS_STRING), DN.valueOf(DN3_AS_STRING));
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      cnDB = environment.getOrCreateCNIndexDB();
      for (int i = 0; i <= 2 ; i++)
      {
        for (int j = 1; j <= 10; j++)
        {
          // 3 domains, 10 server id each, generation id is different for each domain
          replicaDBs.add(environment.getOrCreateReplicaDB(domainDNs.get(i), j, i+1));
        }
      }
      final ChangelogState state = environment.readOnDiskChangelogState();
      assertThat(state.getDomainToServerIds()).containsKeys(domainDNs.get(0), domainDNs.get(1), domainDNs.get(2));
      for (int i = 0; i <= 2 ; i++)
      {
        assertThat(state.getDomainToServerIds().get(domainDNs.get(i))).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
      }
      assertThat(state.getDomainToGenerationId()).containsOnly(
          MapEntry.entry(domainDNs.get(0), 1L),
          MapEntry.entry(domainDNs.get(1), 2L),
          MapEntry.entry(domainDNs.get(2), 3L));
      assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
    }
    finally
    {
      StaticUtils.close(cnDB);
      StaticUtils.close(replicaDBs);
    }
  }
  @Test
  public void testReadChangelogStateWithReplicaOffline() throws Exception
  {
    Log<Long,ChangeNumberIndexRecord> cnDB = null;
    Log<CSN,UpdateMsg> replicaDB = null;
    try
    {
      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      final DN domainDN = DN.valueOf(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      // put server id 1 offline
      CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
      environment.notifyReplicaOffline(domainDN, offlineCSN);
      final ChangelogState state = environment.readOnDiskChangelogState();
      assertThat(state.getOfflineReplicas().getSnapshot())
          .containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
      assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
    }
    finally
    {
      StaticUtils.close(cnDB, replicaDB);
    }
  }
  @Test(expectedExceptions=ChangelogException.class)
  public void testReadChangelogStateWithReplicaOfflineStateFileCorrupted() throws Exception
  {
    Log<Long,ChangeNumberIndexRecord> cnDB = null;
    Log<CSN,UpdateMsg> replicaDB = null;
    try
    {
      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      final DN domainDN = DN.valueOf(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      File offlineStateFile = new File(environment.getServerIdPath("1", 1), REPLICA_OFFLINE_STATE_FILENAME);
      offlineStateFile.createNewFile();
      environment.readOnDiskChangelogState();
    }
    finally
    {
      StaticUtils.close(cnDB, replicaDB);
    }
  }
  @Test
  public void testReadChangelogStateWithReplicaOfflineSentTwice() throws Exception
  {
    Log<Long,ChangeNumberIndexRecord> cnDB = null;
    Log<CSN,UpdateMsg> replicaDB = null;
    try
    {
      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      final DN domainDN = DN.valueOf(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      // put server id 1 offline twice
      CSNGenerator csnGenerator = new CSNGenerator(SERVER_ID_1, 100);
      environment.notifyReplicaOffline(domainDN, csnGenerator.newCSN());
      CSN lastOfflineCSN = csnGenerator.newCSN();
      environment.notifyReplicaOffline(domainDN, lastOfflineCSN);
      final ChangelogState state = environment.readOnDiskChangelogState();
      assertThat(state.getOfflineReplicas().getSnapshot())
          .containsExactly(MapEntry.entry(domainDN, Arrays.asList(lastOfflineCSN)));
      assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
    }
    finally
    {
      StaticUtils.close(cnDB, replicaDB);
    }
  }
  @Test
  public void testReadChangelogStateWithReplicaOfflineThenReplicaOnline() throws Exception
  {
    Log<Long,ChangeNumberIndexRecord> cnDB = null;
    Log<CSN,UpdateMsg> replicaDB = null;
    try
    {
      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      final DN domainDN = DN.valueOf(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      // put server id 1 offline
      environment.notifyReplicaOffline(domainDN, new CSN(TimeThread.getTime(), 0, SERVER_ID_1));
      // put server id 1 online again
      environment.notifyReplicaOnline(domainDN, SERVER_ID_1);
      final ChangelogState state = environment.readOnDiskChangelogState();
      assertThat(state.getOfflineReplicas()).isEmpty();
      assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
    }
    finally
    {
      StaticUtils.close(cnDB, replicaDB);
    }
  }
  @Test
  public void testCreateThenReadChangelogStateWithReplicaOffline() throws Exception
  {
    Log<Long,ChangeNumberIndexRecord> cnDB = null;
    Log<CSN,UpdateMsg> replicaDB = null;
    try
    {
      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      final DN domainDN = DN.valueOf(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
      environment.notifyReplicaOffline(domainDN, offlineCSN);
      final ChangelogState state = environment.readOnDiskChangelogState();
      assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
      assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1);
      assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
      assertThat(state.getOfflineReplicas().getSnapshot())
          .containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
      assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
    }
    finally
    {
      StaticUtils.close(cnDB, replicaDB);
    }
  }
  @Test(expectedExceptions=ChangelogException.class)
  public void testMissingDomainDirectory() throws Exception
  {
    Log<Long,ChangeNumberIndexRecord> cnDB = null;
    Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null;
    try
    {
      File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      DN domainDN = DN.valueOf(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
      // delete the domain directory created for the 2 replica DBs to break the
      // consistency with domain state file
      StaticUtils.recursiveDelete(new File(rootPath, "1.domain"));
      environment.readOnDiskChangelogState();
    }
    finally
    {
      StaticUtils.close(cnDB, replicaDB, replicaDB2);
    }
  }
}