mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
21.56.2014 01eb7d07467b57c61868c73e9a94bff1d0b2dcd1
OPENDJ-1389 – Add support for replication changelog DB rotation

Implemented a log based on multiple log files

* Add Log class that manage a log as a set of log files:
** it contains at least one log file, the head log file,
where new records are appended
** it contains from zero to multiple read-only log files,
issued from rotation of the head log file when it reaches
a given size
* Update LogFile class to act as part of a Log.
* Add purging feature
* Update other classes from file package with minor changes
* Add unit tests for new Log class and purging feature

* Update MeteredStream class to use it in changelog
* Add new interface RotatableLogFile (to be used later by changelog)
* Update Policy classes to use this new interface

3 files added
24 files modified
2814 ■■■■ changed files
opends/src/messages/messages/replication.properties 26 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/loggers/FixedTimeRotationPolicy.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/loggers/MeteredStream.java 15 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/loggers/MultifileTextWriter.java 20 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/loggers/RotatableLogFile.java 51 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/loggers/RotationPolicy.java 10 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/loggers/SizeBasedRotationPolicy.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/loggers/TimeLimitRotationPolicy.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/CSN.java 5 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java 69 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 10 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java 55 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java 7 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java 1091 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java 510 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java 112 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java 43 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java 54 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java 7 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ChangeNumberControlPluginTestCase.java 3 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java 3 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java 5 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java 7 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java 7 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java 107 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java 571 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java 17 ●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -118,7 +118,7 @@
SEVERE_ERR_BAD_HISTORICAL_56=Entry %s was containing some unknown historical \
 information, This may cause some inconsistency for this entry
MILD_ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE_57=A conflict was detected but the \
 conflict information could not be added. Operation: %s, Result: %s
 conflict information could not be added. Operation: %s, Result: %s
MILD_ERR_CANNOT_RENAME_CONFLICT_ENTRY_58=An error happened trying to \
 rename a conflicting entry. DN: %s, Operation: %s, Result: %s
MILD_ERR_EXCEPTION_RENAME_CONFLICT_ENTRY_59=An Exception happened when \
@@ -562,7 +562,7 @@
 on log file '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD_254=Could not decode a record from data \
 read in log file '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_255=Could not delete log file '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_255=Could not delete log file(s): '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_FILE_256=Could not create log file '%s'
SEVERE_WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE_257=The changelog '%s' has been opened in \
 read-only mode, it is not enabled for write
@@ -583,3 +583,25 @@
 Actual domain ids found in file system: '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE_265=Could not create a new domain \
 id %s for domain DN %s and save it in domain state file '%s"
SEVERE_ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE_266=Could not get reader \
 position for cursor in log file '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_DECODE_KEY_FROM_STRING_267=Could not decode the key from \
 string [%s]
SEVERE_ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLEANING_LOG_268=When cleaning log '%s', \
 found %d cursor(s) still opened on the log
SEVERE_ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLOSING_LOG_269=When closing log '%s', \
 found %d cursor(s) still opened on the log
SEVERE_ERR_CHANGELOG_UNABLE_TO_INITIALIZE_LOG_270=Could not initialize \
 the log '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_RETRIEVE_KEY_BOUNDS_FROM_FILE_271=Could not \
 retrieve key bounds from log file '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_RETRIEVE_READ_ONLY_LOG_FILES_LIST_272=Could not \
 retrieve read-only log files from log '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_WHILE_PURGING_273=While purging log, could not \
 delete log file(s): '%s'
SEVERE_ERR_CHANGELOG_UNREFERENCED_LOG_WHILE_RELEASING_274 =The following log \
 '%s' must be released but it is not referenced."
SEVERE_ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE_275=Could not rename \
 head log file from '%s' to '%s'
INFO_CHANGELOG_LOG_FILE_ROTATION_276=Rotation needed for log file '%s', \
 size of head log file is %d bytes
opends/src/server/org/opends/server/loggers/FixedTimeRotationPolicy.java
@@ -22,6 +22,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions Copyright 2014 ForgeRock AS
 */
package org.opends.server.loggers;
import org.opends.messages.Message;
@@ -112,7 +113,7 @@
  /**
   * {@inheritDoc}
   */
  public boolean rotateFile(MultifileTextWriter writer)
  public boolean rotateFile(RotatableLogFile writer)
  {
    Calendar lastRotationTime = writer.getLastRotationTime();
opends/src/server/org/opends/server/loggers/MeteredStream.java
@@ -22,6 +22,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions Copyright 2014 ForgeRock AS
 */
package org.opends.server.loggers;
@@ -33,7 +34,7 @@
 *  (a) forwards all its output to a target stream
 *  (b) keeps track of how many bytes have been written.
 */
class MeteredStream extends OutputStream
public final class MeteredStream extends OutputStream
{
  OutputStream out;
  long written;
@@ -45,7 +46,7 @@
   * @param out     The target output stream to keep track of.
   * @param written The number of bytes written to the stream.
   */
  MeteredStream(OutputStream out, long written)
  public MeteredStream(OutputStream out, long written)
  {
    this.out = out;
    this.written = written;
@@ -111,5 +112,15 @@
  {
    out.close();
  }
  /**
   * Returns the number of bytes written in this stream.
   *
   * @return the number of bytes
   */
  public long getBytesWritten()
  {
    return written;
  }
}
opends/src/server/org/opends/server/loggers/MultifileTextWriter.java
@@ -22,6 +22,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions Copyright 2014 ForgeRock AS
 */
package org.opends.server.loggers;
@@ -57,7 +58,7 @@
 * new one named in accordance with a specified FileNamingPolicy.
 */
public class MultifileTextWriter
    implements ServerShutdownListener, TextWriter,
    implements ServerShutdownListener, TextWriter, RotatableLogFile,
    ConfigurationChangeListener<SizeLimitLogRotationPolicyCfg>
{
  /**
@@ -180,7 +181,6 @@
    outputStream = new MeteredStream(stream, file.length());
    OutputStreamWriter osw = new OutputStreamWriter(outputStream, encoding);
    BufferedWriter bw = null;
    if(bufferSize <= 0)
    {
      writer = new BufferedWriter(osw);
@@ -687,11 +687,8 @@
    this.actions = actions;
  }
  /**
   * Retrieves the number of bytes written to the current log file.
   *
   * @return The number of bytes written to the current log file.
   */
  /** {@inheritDoc} */
  @Override
  public long getBytesWritten()
  {
    return outputStream.written;
@@ -719,13 +716,8 @@
    return lastCleanCount;
  }
  /**
   * Retrieves the last time a log file was rotated in this instance of
   * Directory Server. If a log rotation never
   * occurred, this value will be the time the server started.
   *
   * @return The last time log rotation occurred.
   */
  /** {@inheritDoc} */
  @Override
  public Calendar getLastRotationTime()
  {
    return lastRotationTime;
opends/src/server/org/opends/server/loggers/RotatableLogFile.java
New file
@@ -0,0 +1,51 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.loggers;
import java.util.Calendar;
/**
 * Represents a file that can be rotated based on size or on time.
 */
public interface RotatableLogFile
{
  /**
   * Retrieves the number of bytes written to the file.
   *
   * @return The number of bytes written to the file.
   */
  long getBytesWritten();
  /**
   * Retrieves the last time the file was rotated. If a file rotation never
   * occurred, this value will be the time the server started.
   *
   * @return The last time file rotation occurred.
   */
  Calendar getLastRotationTime();
}
opends/src/server/org/opends/server/loggers/RotationPolicy.java
@@ -22,6 +22,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions Copyright 2014 ForgeRock AS
 */
package org.opends.server.loggers;
@@ -60,14 +61,13 @@
  /**
   * This method indicates if the log file should be
   * rotated or not.
   * This method indicates if the log file should be rotated or not.
   *
   * @param writer The multi file writer writing the file to be
   *        checked.
   * @param writer
   *          the file writer to be checked.
   * @return true if the log file should be rotated, false otherwise.
   */
  public boolean rotateFile(MultifileTextWriter writer);
  public boolean rotateFile(RotatableLogFile writer);
}
opends/src/server/org/opends/server/loggers/SizeBasedRotationPolicy.java
@@ -22,6 +22,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions Copyright 2014 ForgeRock AS
 */
package org.opends.server.loggers;
import org.opends.messages.Message;
@@ -97,7 +98,7 @@
   * @param writer The multi file text writer writing the log file.
   * @return true if the file needs to be rotated, false otherwise.
  */
  public boolean rotateFile(MultifileTextWriter writer)
  public boolean rotateFile(RotatableLogFile writer)
  {
    long fileSize = writer.getBytesWritten();
opends/src/server/org/opends/server/loggers/TimeLimitRotationPolicy.java
@@ -22,6 +22,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions Copyright 2014 ForgeRock AS
 */
package org.opends.server.loggers;
import org.opends.messages.Message;
@@ -90,7 +91,7 @@
   * @param writer The mutli file text writer written the log file.
   * @return true if the file should be rotated, false otherwise.
   */
  public boolean rotateFile(MultifileTextWriter writer)
  public boolean rotateFile(RotatableLogFile writer)
  {
    long currInterval = TimeThread.getTime() -
        writer.getLastRotationTime().getTimeInMillis();
opends/src/server/org/opends/server/replication/common/CSN.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions Copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.common;
@@ -61,6 +61,9 @@
   */
  public static final int STRING_ENCODING_LENGTH = 28;
  /** The maximum possible value for a CSN. */
  public static final CSN MAX_CSN_VALUE = new CSN(Long.MAX_VALUE, Integer.MAX_VALUE, Short.MAX_VALUE);
  private static final long serialVersionUID = -8802722277749190740L;
  private final long timeStamp;
  /**
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
@@ -29,6 +29,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
@@ -38,10 +39,11 @@
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
/**
 * Logfile-based implementation of a ChangeNumberIndexDB.
 * Implementation of a ChangeNumberIndexDB with a log.
 * <p>
 * This class publishes some monitoring information below <code>
 * cn=monitor</code>.
@@ -57,7 +59,7 @@
  static final RecordParser<Long, ChangeNumberIndexRecord> RECORD_PARSER = new ChangeNumberIndexDBParser();
  /** The log in which records are persisted. */
  private final LogFile<Long, ChangeNumberIndexRecord> logFile;
  private final Log<Long, ChangeNumberIndexRecord> log;
  /**
   * The newest changenumber stored in the DB. It is used to avoid purging the
@@ -92,7 +94,7 @@
   */
  FileChangeNumberIndexDB(ReplicationEnvironment replicationEnv) throws ChangelogException
  {
    logFile = replicationEnv.getOrCreateCNIndexDB();
    log = replicationEnv.getOrCreateCNIndexDB();
    final ChangeNumberIndexRecord newestRecord = readLastRecord();
    newestChangeNumber = getChangeNumber(newestRecord);
    // initialization of the lastGeneratedChangeNumber from the DB content
@@ -106,13 +108,13 @@
  private ChangeNumberIndexRecord readLastRecord() throws ChangelogException
  {
    final Record<Long, ChangeNumberIndexRecord> record = logFile.getNewestRecord();
    final Record<Long, ChangeNumberIndexRecord> record = log.getNewestRecord();
    return record == null ? null : record.getValue();
  }
  private ChangeNumberIndexRecord readFirstRecord() throws ChangelogException
  {
    final Record<Long, ChangeNumberIndexRecord> record = logFile.getOldestRecord();
    final Record<Long, ChangeNumberIndexRecord> record = log.getOldestRecord();
    return record == null ? null : record.getValue();
  }
@@ -132,7 +134,7 @@
    final long changeNumber = nextChangeNumber();
    final ChangeNumberIndexRecord newRecord =
        new ChangeNumberIndexRecord(changeNumber, record.getPreviousCookie(), record.getBaseDN(), record.getCSN());
    logFile.addRecord(newRecord.getChangeNumber(), newRecord);
    log.append(Record.from(newRecord.getChangeNumber(), newRecord));
    newestChangeNumber = changeNumber;
    if (debugEnabled())
@@ -177,7 +179,7 @@
   */
  long count() throws ChangelogException
  {
    return logFile.getNumberOfRecords();
    return log.getNumberOfRecords();
  }
  /**
@@ -197,7 +199,7 @@
  @Override
  public DBCursor<ChangeNumberIndexRecord> getCursorFrom(final long startChangeNumber) throws ChangelogException
  {
    return new FileChangeNumberIndexDBCursor(logFile.getCursor(startChangeNumber));
    return new FileChangeNumberIndexDBCursor(log.getCursor(startChangeNumber));
  }
  /**
@@ -207,7 +209,7 @@
  {
    if (shutdown.compareAndSet(false, true))
    {
      logFile.close();
      log.close();
      DirectoryServer.deregisterMonitorProvider(dbMonitor);
    }
  }
@@ -228,11 +230,8 @@
    {
      return null;
    }
    // TODO : no purge implemented yet as implementation is based on a single-file log.
    // The purge must be implemented once we handle a log with multiple files.
    // The purge will only delete whole files.
    return null;
    final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeCSN.getTime());
    return record != null ? record.getValue().getCSN() : null;
  }
  /**
@@ -321,7 +320,7 @@
   */
  public void clear() throws ChangelogException
  {
    logFile.clear();
    log.clear();
    newestChangeNumber = NO_KEY;
  }
@@ -331,15 +330,16 @@
    private static final byte STRING_SEPARATOR = 0;
    @Override
    public ByteString encodeRecord(final Long changeNumber, final ChangeNumberIndexRecord record)
    public ByteString encodeRecord(final Record<Long, ChangeNumberIndexRecord> record)
    {
      final ChangeNumberIndexRecord cnIndexRecord = record.getValue();
      return new ByteStringBuilder()
        .append(changeNumber)
        .append(record.getPreviousCookie())
        .append(record.getKey())
        .append(cnIndexRecord.getPreviousCookie())
        .append(STRING_SEPARATOR)
        .append(record.getBaseDN().toString())
        .append(cnIndexRecord.getBaseDN().toString())
        .append(STRING_SEPARATOR)
        .append(record.getCSN().toByteString()).toByteString();
        .append(cnIndexRecord.getCSN().toByteString()).toByteString();
    }
    @Override
@@ -378,6 +378,35 @@
      }
      return length;
    }
    /** {@inheritDoc} */
    @Override
    public Long decodeKeyFromString(String key) throws ChangelogException
    {
      try
      {
        return Long.valueOf(key);
      }
      catch (NumberFormatException e)
      {
        throw new ChangelogException(
            ERR_CHANGELOG_UNABLE_TO_DECODE_KEY_FROM_STRING.get(key), e);
      }
    }
    /** {@inheritDoc} */
    @Override
    public String encodeKeyToString(Long key)
    {
      return key.toString();
    }
    /** {@inheritDoc} */
    @Override
    public Long getMaxKey()
    {
      return Long.MAX_VALUE;
    }
  }
}
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -49,13 +49,12 @@
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
import com.forgerock.opendj.util.Pair;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import static org.opends.messages.ReplicationMessages.*;
/**
 * Log file implementation of the ChangelogDB interface.
@@ -106,10 +105,11 @@
  /** The local replication server. */
  private final ReplicationServer replicationServer;
  private final AtomicBoolean shutdown = new AtomicBoolean();
  static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
      new FileReplicaDBCursor(new LogFile.EmptyLogCursor<CSN, UpdateMsg>(), null);
      new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null);
  /**
   * Creates a new changelog DB.
@@ -122,10 +122,10 @@
   *           if a problem occurs opening the supplied directory
   */
  public FileChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
      throws ConfigException
     throws ConfigException
  {
    this.config = config;
    this.replicationServer = replicationServer;
    this.config = config;
    this.dbDirectory = makeDir(config.getReplicationDBDirectory());
  }
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -43,7 +43,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.file.LogFile.LogCursor;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.ByteString;
@@ -92,7 +92,7 @@
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
  /** The log in which records are persisted. */
  private final LogFile<CSN, UpdateMsg> logFile;
  private final Log<CSN, UpdateMsg> log;
  /**
   * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
@@ -133,7 +133,7 @@
    this.baseDN = baseDN;
    this.replicationServer = replicationServer;
    this.replicationEnv = replicationEnv;
    this.logFile = createLogFile(replicationEnv);
    this.log = createLog(replicationEnv);
    this.csnLimits = new CSNLimits(readOldestCSN(), readNewestCSN());
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -142,17 +142,17 @@
  private CSN readOldestCSN() throws ChangelogException
  {
    final Record<CSN, UpdateMsg> record = logFile.getOldestRecord();
    final Record<CSN, UpdateMsg> record = log.getOldestRecord();
    return record == null ? null : record.getKey();
  }
  private CSN readNewestCSN() throws ChangelogException
  {
    final Record<CSN, UpdateMsg> record = logFile.getNewestRecord();
    final Record<CSN, UpdateMsg> record = log.getNewestRecord();
    return record == null ? null : record.getKey();
  }
  private LogFile<CSN, UpdateMsg> createLogFile(final ReplicationEnvironment replicationEnv) throws ChangelogException
  private Log<CSN, UpdateMsg> createLog(final ReplicationEnvironment replicationEnv) throws ChangelogException
  {
    final ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN, true);
    return replicationEnv.getOrCreateReplicaDB(baseDN, serverId, domain.getGenerationId());
@@ -174,7 +174,7 @@
          ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
              .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
    }
    logFile.addRecord(Record.from(updateMsg.getCSN(), updateMsg));
    log.append(Record.from(updateMsg.getCSN(), updateMsg));
    final CSNLimits limits = csnLimits;
    final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN());
    final boolean updateOld = limits.oldestCSN == null;
@@ -239,7 +239,7 @@
   */
  DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN) throws ChangelogException
  {
    LogCursor<CSN, UpdateMsg> cursor = logFile.getNearestCursor(startAfterCSN);
    RepositionableCursor<CSN, UpdateMsg> cursor = log.getNearestCursor(startAfterCSN);
    return new FileReplicaDBCursor(cursor, startAfterCSN);
  }
@@ -250,7 +250,7 @@
  {
    if (shutdown.compareAndSet(false, true))
    {
      logFile.close();
      log.close();
      DirectoryServer.deregisterMonitorProvider(dbMonitor);
    }
  }
@@ -270,10 +270,11 @@
    {
      return;
    }
    // TODO : no purge implemented yet, as we have a single-file log.
    // The purge must be implemented once we handle a log with multiple files.
    // The purge will only delete whole files.
    final Record<CSN, UpdateMsg> oldestRecord = log.purgeUpTo(purgeCSN);
    if (oldestRecord != null)
    {
      csnLimits = new CSNLimits(oldestRecord.getKey(), csnLimits.newestCSN);
    }
  }
  /**
@@ -346,7 +347,7 @@
  void clear() throws ChangelogException
  {
    // Remove all persisted data and reset generationId to default value
    logFile.clear();
    log.clear();
    replicationEnv.resetGenerationId(baseDN);
    csnLimits = new CSNLimits(null, null);
@@ -363,7 +364,7 @@
   */
  long getNumberRecords() throws ChangelogException
  {
    return logFile.getNumberOfRecords();
    return log.getNumberOfRecords();
  }
  /** Parser of records persisted in the ReplicaDB log. */
@@ -372,8 +373,9 @@
    private static final DebugTracer TRACER = getTracer();
    @Override
    public ByteString encodeRecord(CSN key, UpdateMsg message)
    public ByteString encodeRecord(final Record<CSN, UpdateMsg> record)
    {
      final UpdateMsg message = record.getValue();
      try
      {
        return ByteString.wrap(message.getBytes());
@@ -400,6 +402,27 @@
        throw new DecodingException(e);
      }
    }
    /** {@inheritDoc} */
    @Override
    public CSN decodeKeyFromString(String key) throws ChangelogException
    {
      return new CSN(key);
    }
    /** {@inheritDoc} */
    @Override
    public String encodeKeyToString(CSN key)
    {
      return key.toString();
    }
    /** {@inheritDoc} */
    @Override
    public CSN getMaxKey()
    {
      return CSN.MAX_CSN_VALUE;
    }
  }
}
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
@@ -29,7 +29,7 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.file.LogFile.LogCursor;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
/**
 * A cursor on ReplicaDB.
@@ -49,7 +49,7 @@
{
  /** The underlying cursor. */
  private final LogCursor<CSN, UpdateMsg> cursor;
  private final RepositionableCursor<CSN, UpdateMsg> cursor;
  /** The next record to return. */
  private Record<CSN, UpdateMsg> nextRecord;
@@ -66,7 +66,7 @@
   *          The CSN to use as a start point (excluded from cursor, the lowest
   *          CSN higher than this CSN is used as the real start point).
   */
  FileReplicaDBCursor(LogCursor<CSN, UpdateMsg> cursor, CSN startAfterCSN) {
  FileReplicaDBCursor(RepositionableCursor<CSN, UpdateMsg> cursor, CSN startAfterCSN) {
    this.cursor = cursor;
    this.lastNonNullCurrentCSN = startAfterCSN;
  }
@@ -90,7 +90,6 @@
    else
    {
      // Exhausted cursor must be able to reinitialize itself
      cursor.rewind();
      cursor.positionTo(lastNonNullCurrentCSN, true);
      nextRecord = cursor.getRecord();
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
New file
@@ -0,0 +1,1091 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import static org.opends.messages.ReplicationMessages.*;
import java.io.Closeable;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.forgerock.util.Reject;
import org.opends.server.loggers.ErrorLogger;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor;
import org.opends.server.util.StaticUtils;
import com.forgerock.opendj.util.Pair;
/**
 * A multi-file log that features appending key-value records and reading them
 * using a {@code DBCursor}.
 * The records must be appended to the log in ascending order of the keys.
 * <p>
 * A log is defined for a path - the log path - and contains one to many log files:
 * <ul>
 * <li>it has always at least one log file, the head log file, named "head.log",
 * which is used to append the records.</li>
 * <li>it may have from zero to many read-only log files, which are named after
 * the pattern '[lowkey]_[highkey}.log' where [lowkey] and [highkey] are respectively
 * the string representation of lowest and highest key present in the log file.</li>
 * </ul>
 * A read-only log file is created each time the head log file has reached the
 * maximum size limit. The head log file is then rotated to the read-only file and a
 * new empty head log file is opened. There is no limit on the number of read-only
 * files, but they can be purged.
 * <p>
 * A log is obtained using the {@code Log.openLog()} method and must always be
 * released using the {@code close()} method.
 * <p>
 * Usage example:
 * <pre>
 * {@code
 *   Log<K, V> log = null;
 *   try
 *   {
 *     log = Log.openLog(logPath, parser, maxFileSize);
 *     log.append(key, value);
 *     DBCursor<K, V> cursor = log.getCursor(someKey);
 *     // use cursor, then close cursor
 *   }
 *   finally
 *   {
 *     log.close();
 *   }
 * }
 * </pre>
 *
 * @param <K>
 *          Type of the key of a record, which must be comparable.
 * @param <V>
 *          Type of the value of a record.
 */
final class Log<K extends Comparable<K>, V> implements Closeable
{
  private static final DebugTracer TRACER = getTracer();
  private static final String LOG_FILE_SUFFIX = ".log";
  static final String HEAD_LOG_FILE_NAME = "head" + LOG_FILE_SUFFIX;
  private static final String LOG_FILE_NAME_SEPARATOR = "_";
  private static final FileFilter READ_ONLY_LOG_FILES_FILTER = new FileFilter()
  {
    @Override
    public boolean accept(File file)
    {
      return file.isFile() && file.getName().endsWith(LOG_FILE_SUFFIX) &&
          !file.getName().equals(HEAD_LOG_FILE_NAME);
    }
  };
  /** Map that holds the unique log instance for each log path. */
  private static final Map<File, Log<?, ?>> logsCache = new HashMap<File, Log<?, ?>>();
  /**
   * The number of references on this log instance. It is incremented each time
   * a log is opened on the same log path. The log is effectively closed only
   * when the {@code close()} method is called and this value is at most 1.
   */
  private int referenceCount;
  /** The path of directory for this log, where log files are stored. */
  private final File logPath;
  /** The parser used for encoding/decoding of records. */
  private final RecordParser<K, V> recordParser;
  /**
   * Indicates if this log is closed. When the log is closed, all methods return
   * immediately with no effect.
   */
  private boolean isClosed;
  /**
   * The log files contained in this log, ordered by key.
   * <p>
   * The head log file is always present and is associated with the maximum
   * possible key, given by the record parser.
   * <p>
   * The read-only log files are associated with the highest key they contain.
   */
  private final TreeMap<K, LogFile<K, V>> logFiles = new TreeMap<K, LogFile<K, V>>();
  /**
   * The list of non-empty cursors opened on this log. Opened cursors may have
   * to be updated when rotating the head log file.
   */
  private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>();
  /**
   * A log file is rotated once it has exceeded this size limit. The log file can have
   * a size much larger than this limit if the last record written has a huge size.
   *
   * TODO : to be replaced later by a (or a list of) configurable Rotation policy
   * eg, List<RotationPolicy> rotationPolicies = new ArrayList<RotationPolicy>();
   */
  private final long sizeLimitPerLogFileInBytes;
  /**
   * The exclusive lock used for writes and lifecycle operations on this log:
   * initialize, clear, sync and close.
   */
  private final Lock exclusiveLock;
  /**
   * The shared lock used for reads and cursor operations on this log.
   */
  private final Lock sharedLock;
  /**
   * Open a log with the provided log path, record parser and maximum size per
   * log file.
   * <p>
   * If no log exists for the provided path, a new one is created.
   *
   * @param <K>
   *          Type of the key of a record, which must be comparable.
   * @param <V>
   *          Type of the value of a record.
   * @param logPath
   *          Path of the log.
   * @param parser
   *          Parser for encoding/decoding of records.
   * @param sizeLimitPerFileInBytes
   *          Limit in bytes before rotating the head log file of the log.
   * @return a log
   * @throws ChangelogException
   *           If a problem occurs during initialization.
   */
  static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final File logPath,
      final RecordParser<K, V> parser, final long sizeLimitPerFileInBytes) throws ChangelogException
  {
    Reject.ifNull(logPath, parser);
    @SuppressWarnings("unchecked")
    Log<K, V> log = (Log<K, V>) logsCache.get(logPath);
    if (log == null)
    {
      log = new Log<K, V>(logPath, parser, sizeLimitPerFileInBytes);
      logsCache.put(logPath, log);
    }
    else
    {
      // TODO : check that parser and size limit are compatible with the existing one,
      // and issue a warning if it is not the case
      log.referenceCount++;
    }
    return log;
  }
  /**
   * Release a reference to the log corresponding to provided path. The log is
   * closed if this is the last reference.
   */
  private static synchronized void releaseLog(final File logPath)
  {
    Log<?, ?> log = logsCache.get(logPath);
    if (log == null)
    {
      // this should never happen
      ErrorLogger.logError(
          ERR_CHANGELOG_UNREFERENCED_LOG_WHILE_RELEASING.get(logPath.getPath()));
      return;
    }
    if (log.referenceCount > 1)
    {
      log.referenceCount--;
    }
    else
    {
      log.doClose();
      logsCache.remove(logPath);
    }
  }
  /**
   * Creates a new log.
   *
   * @param logPath
   *            The directory path of the log.
   * @param parser
   *          Parser of records.
   * @param sizeLimitPerFile
   *            Limit in bytes before rotating a log file.
   * @throws ChangelogException
   *            If a problem occurs during initialization.
   */
  private Log(final File logPath, final RecordParser<K, V> parser, final long sizeLimitPerFile)
      throws ChangelogException
  {
    this.logPath = logPath;
    this.recordParser = parser;
    this.sizeLimitPerLogFileInBytes = sizeLimitPerFile;
    this.referenceCount = 1;
    final ReadWriteLock lock = new ReentrantReadWriteLock(false);
    this.exclusiveLock = lock.writeLock();
    this.sharedLock = lock.readLock();
    createOrOpenLogFiles();
  }
  /** Create or open log files used by this log. */
  private void createOrOpenLogFiles() throws ChangelogException
  {
    exclusiveLock.lock();
    try
    {
      createRootDirIfNotExists();
      openHeadLogFile();
      for (final File file : getReadOnlyLogFiles())
      {
        openReadOnlyLogFile(file);
      }
      isClosed = false;
    }
    catch (ChangelogException e)
    {
      // ensure all log files opened at this point are closed
      close();
      throw new ChangelogException(
          ERR_CHANGELOG_UNABLE_TO_INITIALIZE_LOG.get(logPath.getPath()), e);
    }
    finally
    {
      exclusiveLock.unlock();
    }
  }
  private File[] getReadOnlyLogFiles() throws ChangelogException
  {
    File[] files = logPath.listFiles(READ_ONLY_LOG_FILES_FILTER);
    if (files == null)
    {
      throw new ChangelogException(
          ERR_CHANGELOG_UNABLE_TO_RETRIEVE_READ_ONLY_LOG_FILES_LIST.get(logPath.getPath()));
    }
    return files;
  }
  private void createRootDirIfNotExists() throws ChangelogException
  {
    if (!logPath.exists())
    {
      if (!logPath.mkdirs())
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(logPath.getPath()));
      }
    }
  }
  /**
   * Returns the path of this log.
   *
   * @return the path of this log directory
   */
  public File getPath()
  {
    return logPath;
  }
  /**
   * Add the provided record at the end of this log.
   * <p>
   * In order to ensure that record is written out of buffers and persisted
   * to file system, it is necessary to explicitely call the
   * {@code syncToFileSystem()} method.
   *
   * @param record
   *          The record to add.
   * @throws ChangelogException
   *           If the record can't be added to the log.
   */
  public void append(final Record<K, V> record) throws ChangelogException
  {
    // If this exclusive lock happens to be a bottleneck :
    // 1. use a shared lock for appending the record first
    // 2. switch to an exclusive lock only if rotation is needed
    // See http://sources.forgerock.org/cru/CR-3548#c27521 for full detail
    exclusiveLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      LogFile<K, V> headLogFile = getHeadLogFile();
      if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
      {
        ErrorLogger.logError(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
        rotateHeadLogFile();
        headLogFile = getHeadLogFile();
      }
      headLogFile.append(record);
    }
    finally
    {
      exclusiveLock.unlock();
    }
  }
  /**
   * Synchronize all records added with the file system, ensuring that records
   * are effectively persisted.
   * <p>
   * After a successful call to this method, it is guaranteed that all records
   * added to the log are persisted to the file system.
   *
   * @throws ChangelogException
   *           If the synchronization fails.
   */
  public void syncToFileSystem() throws ChangelogException
  {
    exclusiveLock.lock();
    try
    {
      getHeadLogFile().syncToFileSystem();
    }
    finally
    {
      exclusiveLock.unlock();
    }
  }
  /**
   * Returns a cursor that allows to retrieve the records from this log,
   * starting at the first position.
   * <p>
   * The returned cursor initially points to record corresponding to the first
   * key, that is {@code cursor.getRecord()} is equals to the record
   * corresponding to the first key before any call to {@code cursor.next()}
   * method.
   *
   * @return a cursor on the log records, which is never {@code null}
   * @throws ChangelogException
   *           If the cursor can't be created.
   */
  public RepositionableCursor<K, V> getCursor() throws ChangelogException
  {
    LogCursor<K, V> cursor = null;
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return new EmptyLogCursor<K, V>();
      }
      cursor = new LogCursor<K, V>(this);
      cursor.positionTo(null, false);
      registerCursor(cursor);
      return cursor;
    }
    catch (ChangelogException e)
    {
      StaticUtils.close(cursor);
      throw e;
    }
    finally
    {
      sharedLock.unlock();
    }
  }
  /**
   * Returns a cursor that allows to retrieve the records from this log,
   * starting at the position defined by the provided key.
   * <p>
   * The returned cursor initially points to record corresponding to the key,
   * that is {@code cursor.getRecord()} is equals to the record corresponding to
   * the key before any call to {@code cursor.next()} method.
   *
   * @param key
   *          Key to use as a start position for the cursor. If key is
   *          {@code null}, cursor will point at the first record of the log.
   * @return a cursor on the log records, which is never {@code null}
   * @throws ChangelogException
   *           If the cursor can't be created.
   */
  public RepositionableCursor<K, V> getCursor(final K key) throws ChangelogException
  {
    return getCursor(key, false);
  }
  /**
   * Returns a cursor that allows to retrieve the records from this log,
   * starting at the position defined by the smallest key that is higher than
   * the provided key.
   * <p>
   * The returned cursor initially points to record corresponding to the key
   * found, that is {@code cursor.getRecord()} is equals to the record
   * corresponding to the key found before any call to {@code cursor.next()}
   * method.
   *
   * @param key
   *          Key to use as a start position for the cursor. If key is
   *          {@code null}, cursor will point at the first record of the log.
   * @return a cursor on the log records, which is never {@code null}
   * @throws ChangelogException
   *           If the cursor can't be created.
   */
  public RepositionableCursor<K, V> getNearestCursor(final K key) throws ChangelogException
  {
    return getCursor(key, true);
  }
  /** Returns a cursor starting from a key, using the strategy corresponding to provided boolean. */
  private RepositionableCursor<K, V> getCursor(final K key, boolean findNearest) throws ChangelogException
  {
    if (key == null)
    {
      return getCursor();
    }
    LogCursor<K, V> cursor = null;
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return new EmptyLogCursor<K, V>();
      }
      cursor = new LogCursor<K, V>(this);
      final boolean isFound = cursor.positionTo(key, findNearest);
      // for nearest case, it is ok if the target is not found
      if (isFound || findNearest)
      {
        registerCursor(cursor);
        return cursor;
      }
      else
      {
        StaticUtils.close(cursor);
        return new EmptyLogCursor<K, V>();
      }
    }
    catch (ChangelogException e)
    {
      StaticUtils.close(cursor);
      throw e;
    }
    finally
    {
      sharedLock.unlock();
    }
  }
  /**
   * Returns the oldest (first) record from this log.
   *
   * @return the oldest record, which may be {@code null} if there is no record
   *         in the log.
   * @throws ChangelogException
   *           If an error occurs while retrieving the record.
   */
  public Record<K, V> getOldestRecord() throws ChangelogException
  {
    return getOldestLogFile().getOldestRecord();
  }
  /**
   * Returns the newest (last) record from this log.
   *
   * @return the newest record, which may be {@code null}
   * @throws ChangelogException
   *           If an error occurs while retrieving the record.
   */
  public Record<K, V> getNewestRecord() throws ChangelogException
  {
    return getHeadLogFile().getNewestRecord();
  }
  /**
   * Returns the number of records in the log.
   *
   * @return the number of records
   * @throws ChangelogException
   *            If a problem occurs.
   */
  public long getNumberOfRecords() throws ChangelogException
  {
    long count = 0;
    for (final LogFile<K, V> logFile : logFiles.values())
    {
      count += logFile.getNumberOfRecords();
    }
    return count;
  }
  /**
   * Purge the log up to and excluding the provided key.
   *
   * @param purgeKey
   *            the key up to which purging must happen
   * @return the oldest non purged record, or {@code null}
   *         if no record was purged
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  public Record<K,V> purgeUpTo(final K purgeKey) throws ChangelogException
  {
    exclusiveLock.lock();
    try
    {
      if (isClosed)
      {
        return null;
      }
      final SortedMap<K, LogFile<K, V>> logFilesToPurge = logFiles.headMap(purgeKey);
      if (logFilesToPurge.isEmpty())
      {
        return null;
      }
      final List<String> undeletableFiles = new ArrayList<String>();
      final Iterator<Entry<K, LogFile<K, V>>> entriesToPurge = logFilesToPurge.entrySet().iterator();
      while (entriesToPurge.hasNext())
      {
        final LogFile<K, V> logFile = entriesToPurge.next().getValue();
        try
        {
          logFile.close();
          logFile.delete();
          entriesToPurge.remove();
        }
        catch (ChangelogException e)
        {
          // The deletion of log file on file system has failed
          undeletableFiles.add(logFile.getFile().getPath());
        }
      }
      if (!undeletableFiles.isEmpty())
      {
        throw new ChangelogException(
            ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_WHILE_PURGING.get(
                StaticUtils.listToString(undeletableFiles, ", ")));
      }
      return getOldestRecord();
    }
    finally
    {
      exclusiveLock.unlock();
    }
  }
  /**
   * Empties the log, discarding all records it contains.
   *
   * @throws ChangelogException
   *           If cursors are opened on this log, or if a problem occurs during
   *           clearing operation.
   */
  public void clear() throws ChangelogException
  {
    exclusiveLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      if (!openCursors.isEmpty())
      {
        // TODO: throwing an exception makes the replication/totalupdate.txt robot functional test fail because
        // there is one open cursor when clearing.
        // Switch to logging until this issue is solved
        // throw new ChangelogException(Message.raw("Can't clean log '%s' because there are %d cursor(s) opened on it",
        //    logPath.getPath(), openCursors.size()));
        ErrorLogger.logError(
            ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLEANING_LOG.get(logPath.getPath(), openCursors.size()));
      }
      // delete all log files
      final List<String> undeletableFiles = new ArrayList<String>();
      for (LogFile<K, V> logFile : logFiles.values())
      {
        try
        {
          logFile.close();
          logFile.delete();
        }
        catch (ChangelogException e)
        {
          undeletableFiles.add(logFile.getFile().getPath());
        }
      }
      if (!undeletableFiles.isEmpty())
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(
            StaticUtils.listToString(undeletableFiles, ", ")));
      }
      logFiles.clear();
      // recreate an empty head log file
      openHeadLogFile();
    }
    catch (Exception e)
    {
      throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(logPath.getPath(), stackTraceToSingleLineString(e)));
    }
    finally
    {
      exclusiveLock.unlock();
    }
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    releaseLog(logPath);
  }
  /** Effectively close this log. */
  private void doClose()
  {
    exclusiveLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      if (!openCursors.isEmpty())
      {
        ErrorLogger.logError(
            ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLOSING_LOG.get(logPath.getPath(), openCursors.size()));
      }
      StaticUtils.close(logFiles.values());
      isClosed = true;
    }
    finally
    {
      exclusiveLock.unlock();
    }
  }
  private LogFile<K, V> getHeadLogFile()
  {
    return logFiles.lastEntry().getValue();
  }
  private LogFile<K, V> getOldestLogFile()
  {
    return logFiles.firstEntry().getValue();
  }
  /** Rotate the head log file to a read-only log file, and open a new empty head log file to write in. */
  private void rotateHeadLogFile() throws ChangelogException
  {
    final LogFile<K, V> headLogFile = getHeadLogFile();
    final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile));
    headLogFile.close();
    renameHeadLogFileTo(readOnlyLogFile);
    openHeadLogFile();
    openReadOnlyLogFile(readOnlyLogFile);
    updateCursorsOpenedOnHead();
  }
  private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException
  {
    final File headLogFile = new File(logPath, HEAD_LOG_FILE_NAME);
    try
    {
      StaticUtils.renameFile(headLogFile, rotatedLogFile);
    }
    catch (IOException e)
    {
      throw new ChangelogException(
          ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(HEAD_LOG_FILE_NAME, rotatedLogFile.getPath()), e);
    }
  }
  /**
   * Returns the key bounds for the provided log file.
   *
   * @return the pair of (lowest key, highest key) that correspond to records
   *         stored in the corresponding log file.
   * @throws ChangelogException
   *            if an error occurs while retrieving the keys
   */
   private Pair<K, K> getKeyBounds(final LogFile<K, V> logFile) throws ChangelogException
   {
     try
     {
       final String name = logFile.getFile().getName();
       final String[] keys = name.substring(0, name.length() - Log.LOG_FILE_SUFFIX.length())
           .split(LOG_FILE_NAME_SEPARATOR);
       return Pair.of(recordParser.decodeKeyFromString(keys[0]), recordParser.decodeKeyFromString(keys[1]));
     }
     catch (Exception e)
     {
       throw new ChangelogException(
           ERR_CHANGELOG_UNABLE_TO_RETRIEVE_KEY_BOUNDS_FROM_FILE.get(logFile.getFile().getPath()), e);
     }
   }
   /**
    * Returns the file name to use for the read-only version of the provided
    * log file.
    * <p>
    * The file name is based on the lowest and highest key in the log file.
    *
    * @return the name to use for the read-only version of the log file
    * @throws ChangelogException
    *            If an error occurs.
    */
  private String generateReadOnlyFileName(final LogFile<K,V> logFile) throws ChangelogException
  {
    final K lowestKey = logFile.getOldestRecord().getKey();
    final K highestKey = logFile.getNewestRecord().getKey();
    return recordParser.encodeKeyToString(lowestKey) + LOG_FILE_NAME_SEPARATOR
       + recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
  }
  /** Update the open cursors after a rotation of the head log file. */
  private void updateCursorsOpenedOnHead() throws ChangelogException
  {
    for (LogCursor<K, V> cursor : openCursors)
    {
      final CursorState<K, V> state = cursor.getState();
      // Need to update the cursor only if it is pointing to the head log file
      if (isHeadLogFile(state.logFile))
      {
        updateOpenCursor(cursor, state);
      }
    }
  }
  /**
   * Update the provided open cursor with the provided state.
   * <p>
   * The cursor must report the previous state on the head log file to the same
   * state (position in file, current record) in the read-only log file just created.
   */
  private void updateOpenCursor(final LogCursor<K,V> cursor, final CursorState<K, V> state) throws ChangelogException
  {
    final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
    final LogFile<K, V> logFile = findLogFileFor(previousKey);
    cursor.reinitializeTo(new CursorState<K, V>(logFile, state.filePosition, state.record));
  }
  private void openHeadLogFile() throws ChangelogException
  {
    logFiles.put(recordParser.getMaxKey(),
        LogFile.newAppendableLogFile(new File(logPath,  HEAD_LOG_FILE_NAME), recordParser));
  }
  private void openReadOnlyLogFile(final File logFilePath) throws ChangelogException
  {
    final LogFile<K, V> logFile = LogFile.newReadOnlyLogFile(logFilePath, recordParser);
    final Pair<K, K> bounds = getKeyBounds(logFile);
    logFiles.put(bounds.getSecond(), logFile);
  }
  private void registerCursor(final LogCursor<K, V> cursor)
  {
    openCursors.add(cursor);
  }
  private void unregisterCursor(final LogCursor<K, V> cursor)
  {
    openCursors.remove(cursor);
  }
  /**
   * Returns the log file that is just after the provided log file wrt the order
   * defined on keys, or {@code null} if the provided log file is the last one
   * (the head log file).
   */
  private LogFile<K, V> getNextLogFile(final LogFile<K, V> currentLogFile) throws ChangelogException
  {
    if (isHeadLogFile(currentLogFile))
    {
      return null;
    }
    final Pair<K, K> bounds = getKeyBounds(currentLogFile);
    return logFiles.higherEntry(bounds.getSecond()).getValue();
  }
  private boolean isHeadLogFile(final LogFile<K, V> logFile)
  {
    return logFile.getFile().getName().equals(Log.HEAD_LOG_FILE_NAME);
  }
  /** Returns the log file that should contain the provided key. */
  private LogFile<K, V> findLogFileFor(final K key)
  {
    if (key == null || logFiles.lowerKey(key) == null)
    {
      return getOldestLogFile();
    }
    return logFiles.ceilingEntry(key).getValue();
  }
  /**
   * Represents a cursor than can be repositioned on a given key.
   */
  static interface RepositionableCursor<K extends Comparable<K>,V> extends DBCursor<Record<K, V>>
  {
    /**
     * Position the cursor to the record corresponding to the provided key or to
     * the nearest key (the lowest key higher than the provided key).
     *
     * @param key
     *          Key to use as a start position for the cursor. If key is
     *          {@code null}, use the key of the first record instead.
     * @param findNearest
     *          If {@code true}, start position is the lowest key that is higher
     *          than the provided key, otherwise start position is the provided
     *          key.
     * @return {@code true} if cursor is successfully positionned to the key or
     *         the nearest key, {@code false} otherwise.
     * @throws ChangelogException
     *           If an error occurs when positioning cursor.
     */
    boolean positionTo(K key, boolean findNearest) throws ChangelogException;
  }
  /**
   * Implements a cursor on the log.
   * <p>
   * The cursor initially points to a record, that is {@code cursor.getRecord()}
   * is equals to the first record available from the cursor before any call to
   * {@code cursor.next()} method.
   * <p>
   * The cursor uses the log shared lock to ensure reads are not done during a rotation.
   */
  private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
  {
    private final Log<K, V> log;
    private LogFile<K, V> currentLogFile;
    private LogFileCursor<K, V> currentCursor;
    /**
     * Creates a cursor on the provided log.
     *
     * @param log
     *           The log on which the cursor read records.
     * @throws ChangelogException
     *           If an error occurs when creating the cursor.
     */
    private LogCursor(final Log<K, V> log) throws ChangelogException
    {
      this.log = log;
    }
    /** {@inheritDoc} */
    @Override
    public Record<K, V> getRecord()
    {
      return currentCursor.getRecord();
    }
    /** {@inheritDoc} */
    @Override
    public boolean next() throws ChangelogException
    {
      log.sharedLock.lock();
      try
      {
        final boolean hasNext = currentCursor.next();
        if (hasNext)
        {
          return true;
        }
        final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
        if (logFile != null)
        {
          switchToLogFile(logFile);
          return true;
        }
        return false;
      }
      finally
      {
        log.sharedLock.unlock();
      }
    }
    /** {@inheritDoc} */
    @Override
    public void close()
    {
      log.sharedLock.lock();
      try
      {
        StaticUtils.close(currentCursor);
        log.unregisterCursor(this);
      }
      finally
      {
        log.sharedLock.unlock();
      }
    }
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(final K key, final boolean findNearest) throws ChangelogException
    {
      log.sharedLock.lock();
      try
      {
        final LogFile<K, V> logFile = log.findLogFileFor(key);
        if (logFile != currentLogFile)
        {
          switchToLogFile(logFile);
        }
        if (key != null)
        {
          boolean isFound = currentCursor.positionTo(key, findNearest);
          if (isFound && getRecord() == null)
          {
            // The key to position to may be in the next file, force the switch
            isFound = next();
          }
          return isFound;
        }
        return true;
      }
      finally
      {
        log.sharedLock.unlock();
      }
    }
    /** Returns the state of this cursor. */
    private CursorState<K, V> getState() throws ChangelogException
    {
      return new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
    }
    /** Reinitialize this cursor to the provided state. */
    private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
    {
      StaticUtils.close(currentCursor);
      currentLogFile = cursorState.logFile;
      currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
    }
    /** Switch the cursor to the provided log file. */
    private void switchToLogFile(final LogFile<K, V> logFile) throws ChangelogException
    {
      StaticUtils.close(currentCursor);
      currentLogFile = logFile;
      currentCursor = currentLogFile.getCursor();
    }
  }
  /** An empty cursor, that always return null records and false to {@code next()} method. */
  static final class EmptyLogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
  {
    /** {@inheritDoc} */
    @Override
    public Record<K,V> getRecord()
    {
      return null;
    }
    /** {@inheritDoc} */
    @Override
    public boolean next()
    {
      return false;
    }
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(K key, boolean returnLowestHigher) throws ChangelogException
    {
      return false;
    }
    /** {@inheritDoc} */
    @Override
    public void close()
    {
      // nothing to do
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      return "EmptyLogCursor";
    }
  }
  /**
   * Represents the state of a cursor.
   * <p>
   * The state is used to update a cursor when rotating the head log file : the
   * state of cursor on head log file must be reported to the new read-only log
   * file that is created when rotating.
   */
  private static class CursorState<K extends Comparable<K>, V>
  {
    /** The log file. */
    private final LogFile<K, V> logFile;
    /**
     * The position of the reader on the log file. It is the offset from the
     * beginning of the file, in bytes, at which the next read occurs.
     */
    private final long filePosition;
    /** The record the cursor is pointing to. */
    private final Record<K,V> record;
    private CursorState(final LogFile<K, V> logFile, final long filePosition, final Record<K, V> record)
    {
      this.logFile = logFile;
      this.filePosition = filePosition;
      this.record = record;
    }
  }
}
opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -25,8 +25,8 @@
 */
package org.opends.server.replication.server.changelog.file;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.BufferedWriter;
import java.io.Closeable;
@@ -35,24 +35,30 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.forgerock.util.Reject;
import org.opends.messages.Message;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
import static org.opends.messages.ReplicationMessages.*;
/**
 * A file-based log that allow to append key-value records and
 * read them using a {@code DBCursor}.
 * A log file, containing part of a {@code Log}. The log file may be:
 * <ul>
 * <li>write-enabled : allowing to append key-value records and read records
 * from cursors,</li>
 * <li>read-only : allowing to read records from cursors.</li>
 * </ul>
 * <p>
 * A log file is NOT intended to be used directly, but only has part of a
 * {@code Log}. In particular, there is no concurrency management and no checks
 * to ensure that log is not closed when performing any operation on it. Those
 * are managed at the {@code Log} level.
 *
 * @param <K>
 *          Type of the key of a record, which must be comparable.
@@ -61,16 +67,9 @@
 */
final class LogFile<K extends Comparable<K>, V> implements Closeable
{
  private static final DebugTracer TRACER = getTracer();
  // Non private for use in tests
  static final String LOG_FILE_NAME = "current.log";
  /** The path of directory that contains the log file. */
  private final File rootPath;
  /** The log file containing the records. */
  /** The file containing the records. */
  private final File logfile;
  /** The parser of records, to convert bytes to record and record to bytes. */
@@ -79,30 +78,20 @@
  /** The pool to obtain a reader on the log. */
  private LogReaderPool readerPool;
  /** The writer on the log, which may be {@code null} if log is not write-enabled */
  /**
   * The writer on the log file, which may be {@code null} if log file is not
   * write-enabled
   */
  private LogWriter writer;
  /** Indicates if log is enabled for write. */
  private final boolean isWriteEnabled;
  /** Indicates if the log is closed. */
  private volatile boolean isClosed;
  /** The exclusive lock used for wide changes on this log file : init, clear, sync and close. */
  private final Lock exclusiveLock;
  /**
   * The shared lock used for read, write and flush operations on this log file.
   * Write and flush operations can be shared because they're synchronized in the underlying writer.
   * Reads can be done safely when writing because partially written records are handled.
   */
  private final Lock sharedLock;
  /**
   * Creates a new log file.
   *
   * @param rootPath
   *          Path of root directory that contains the log file.
   * @param logFilePath
   *          Path of the log file.
   * @param parser
   *          Parser of records.
   * @param isWriteEnabled
@@ -111,17 +100,13 @@
   * @throws ChangelogException
   *            If a problem occurs during initialization.
   */
  private LogFile(final File rootPath, final RecordParser<K, V> parser, boolean isWriteEnabled)
  private LogFile(final File logFilePath, final RecordParser<K, V> parser, boolean isWriteEnabled)
      throws ChangelogException
  {
    this.rootPath = rootPath;
    Reject.ifNull(logFilePath, parser);
    this.logfile = logFilePath;
    this.parser = parser;
    this.isWriteEnabled = isWriteEnabled;
    this.logfile = new File(rootPath, LOG_FILE_NAME);
    final ReadWriteLock lock = new ReentrantReadWriteLock(false);
    this.exclusiveLock = lock.writeLock();
    this.sharedLock = lock.readLock();
    initialize();
  }
@@ -133,18 +118,18 @@
   *            Type of the key of a record, which must be comparable.
   * @param <V>
   *            Type of the value of a record.
   * @param rootPath
   *          Path of root directory that contains the log file.
   * @param logFilePath
   *          Path of the log file.
   * @param parser
   *          Parser of records.
   * @return a read-only log file
   * @throws ChangelogException
   *            If a problem occurs during initialization.
   */
  public static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File rootPath,
  static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File logFilePath,
      final RecordParser<K, V> parser) throws ChangelogException
  {
    return new LogFile<K, V>(rootPath, parser, false);
    return new LogFile<K, V>(logFilePath, parser, false);
  }
  /**
@@ -155,18 +140,18 @@
   *          Type of the key of a record, which must be comparable.
   * @param <V>
   *          Type of the value of a record.
   * @param rootPath
   *          Path of root directory that contains the log file.
   * @param logFilePath
   *          Path of the log file.
   * @param parser
   *          Parser of records.
   * @return a write-enabled log file
   * @throws ChangelogException
   *            If a problem occurs during initialization.
   */
  public static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File rootPath,
  static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File logFilePath,
      final RecordParser<K, V> parser) throws ChangelogException
  {
    return new LogFile<K, V>(rootPath, parser, true);
    return new LogFile<K, V>(logFilePath, parser, true);
  }
  /**
@@ -180,89 +165,29 @@
   */
  private void initialize() throws ChangelogException
  {
    exclusiveLock.lock();
    try
    createLogFileIfNotExists();
    if (isWriteEnabled)
    {
      createRootDirIfNotExists();
      createLogFileIfNotExists();
      isClosed = false;
      if (isWriteEnabled)
      {
        writer = LogWriter.acquireWriter(logfile);
      }
      readerPool = new LogReaderPool(logfile);
      writer = new LogWriter(logfile);
    }
    finally
    {
      exclusiveLock.unlock();
    }
    readerPool = new LogReaderPool(logfile);
  }
  /**
   * Returns the name of this log.
   * Returns the file containing the records.
   *
   * @return the name, which corresponds to the directory containing the log
   * @return the file
   */
  public String getName()
  File getFile()
  {
    return logfile.getParent().toString();
  }
  /**
   * Empties the log, discarding all records it contains.
   * <p>
   * This method should not be called with open cursors or
   * when multiple instances of the log are opened.
   *
   * @throws ChangelogException
   *            If a problem occurs.
   */
  public void clear() throws ChangelogException
  {
    checkLogIsEnabledForWrite();
    exclusiveLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      close();
      final boolean isDeleted = logfile.delete();
      if (!isDeleted)
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(logfile.getPath()));
      }
      initialize();
    }
    catch (Exception e)
    {
      throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(getName(), stackTraceToSingleLineString(e)));
    }
    finally
    {
      exclusiveLock.unlock();
    }
    return logfile;
  }
  private void checkLogIsEnabledForWrite() throws ChangelogException
  {
    if (!isWriteEnabled)
    {
      throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(rootPath.getPath()));
    }
  }
  private void createRootDirIfNotExists() throws ChangelogException
  {
    if (!rootPath.exists())
    {
      final boolean created = rootPath.mkdirs();
      if (!created)
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(rootPath.getPath()));
      }
      throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(logfile.getPath()));
    }
  }
@@ -282,25 +207,6 @@
  }
  /**
   * Add a record at the end of this log from the provided key and value.
   * <p>
   * In order to ensure that record is written out of buffers and persisted
   * to file system, it is necessary to explicitely call the
   * {@code syncToFileSystem()} method.
   *
   * @param key
   *          The key of the record.
   * @param value
   *          The value of the record.
   * @throws ChangelogException
   *           If the record can't be added to the log.
   */
  public void addRecord(final K key, final V value) throws ChangelogException
  {
    addRecord(Record.from(key, value));
  }
  /**
   * Add the provided record at the end of this log.
   * <p>
   * In order to ensure that record is written out of buffers and persisted
@@ -312,33 +218,22 @@
   * @throws ChangelogException
   *           If the record can't be added to the log.
   */
  public void addRecord(final Record<K, V> record) throws ChangelogException
  void append(final Record<K, V> record) throws ChangelogException
  {
    checkLogIsEnabledForWrite();
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      writer.write(encodeRecord(record));
      writer.flush();
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getName()), e);
    }
    finally
    {
      sharedLock.unlock();
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getPath()), e);
    }
  }
  private ByteString encodeRecord(final Record<K, V> record)
  {
    final ByteString data = parser.encodeRecord(record.getKey(), record.getValue());
    final ByteString data = parser.encodeRecord(record);
    return new ByteStringBuilder()
      .append(data.length())
      .append(data)
@@ -346,14 +241,14 @@
  }
  /**
   * Dump this log as text file, intended for debugging purpose only.
   * Dump this log file as a text file, intended for debugging purpose only.
   *
   * @param dumpFile
   *          File that will contains log records using a human-readable format
   * @throws ChangelogException
   *           If an error occurs during dump
   */
  public void dumpAsTextFile(File dumpFile) throws ChangelogException
  void dumpAsTextFile(File dumpFile) throws ChangelogException
  {
    DBCursor<Record<K, V>> cursor = getCursor();
    BufferedWriter textWriter = null;
@@ -364,7 +259,7 @@
      {
        Record<K, V> record = cursor.getRecord();
        textWriter.write("key=" + record.getKey());
        textWriter.write(" -- ");
        textWriter.write(" | ");
        textWriter.write("value=" + record.getValue());
        textWriter.write('\n');
        cursor.next();
@@ -374,7 +269,7 @@
    {
      // No I18N needed, used for debugging purpose only
      throw new ChangelogException(
          Message.raw("Error when dumping content of log '%s' in target file : '%s'", getName(), dumpFile), e);
          Message.raw("Error when dumping content of log '%s' in target file : '%s'", getPath(), dumpFile), e);
    }
    finally
    {
@@ -392,20 +287,16 @@
   * @throws ChangelogException
   *           If the synchronization fails.
   */
  public void syncToFileSystem() throws ChangelogException
  void syncToFileSystem() throws ChangelogException
  {
    exclusiveLock.lock();
    checkLogIsEnabledForWrite();
    try
    {
      writer.sync();
    }
    catch (Exception e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getName()), e);
    }
    finally
    {
      exclusiveLock.unlock();
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e);
    }
  }
@@ -422,21 +313,9 @@
   * @throws ChangelogException
   *           If the cursor can't be created.
   */
  public LogCursor<K, V> getCursor() throws ChangelogException
  LogFileCursor<K, V> getCursor() throws ChangelogException
  {
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return new EmptyLogCursor<K, V>();
      }
      return new LogFileCursor<K, V>(this);
    }
    finally
    {
      sharedLock.unlock();
    }
    return new LogFileCursor<K, V>(this);
  }
  /**
@@ -454,7 +333,7 @@
   * @throws ChangelogException
   *           If the cursor can't be created.
   */
  public LogCursor<K, V> getCursor(final K key) throws ChangelogException
  LogFileCursor<K, V> getCursor(final K key) throws ChangelogException
  {
    return getCursor(key, false);
  }
@@ -476,13 +355,13 @@
   * @throws ChangelogException
   *           If the cursor can't be created.
   */
  public LogCursor<K, V> getNearestCursor(final K key) throws ChangelogException
  LogFileCursor<K, V> getNearestCursor(final K key) throws ChangelogException
  {
    return getCursor(key, true);
  }
  /** Returns a cursor starting from a key, using the strategy corresponding to provided indicator. */
  private LogCursor<K, V> getCursor(final K key, boolean findNearest)
  private LogFileCursor<K, V> getCursor(final K key, boolean findNearest)
      throws ChangelogException
  {
    if (key == null)
@@ -490,13 +369,8 @@
      return getCursor();
    }
    LogFileCursor<K, V> cursor = null;
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return new EmptyLogCursor<K, V>();
      }
      cursor = new LogFileCursor<K, V>(this);
      cursor.positionTo(key, findNearest);
      // if target is not found, cursor is positioned at end of stream
@@ -506,10 +380,22 @@
      StaticUtils.close(cursor);
      throw e;
    }
    finally
    {
      sharedLock.unlock();
    }
  }
  /**
   * Returns a cursor initialised to the provided record and position in file.
   *
   * @param record
   *            The initial record this cursor points on
   * @param position
   *            The file position this cursor points on
   * @return the cursor
   * @throws ChangelogException
   *            If a problem occurs while creating the cursor.
   */
  LogFileCursor<K, V> getCursorInitialisedTo(Record<K,V> record, long position) throws ChangelogException
  {
    return new LogFileCursor<K, V>(this, record, position);
  }
  /**
@@ -520,7 +406,7 @@
   * @throws ChangelogException
   *           If an error occurs while retrieving the record.
   */
  public Record<K, V> getOldestRecord() throws ChangelogException
  Record<K, V> getOldestRecord() throws ChangelogException
  {
    DBCursor<Record<K, V>> cursor = null;
    try
@@ -541,7 +427,7 @@
   * @throws ChangelogException
   *           If an error occurs while retrieving the record.
   */
  public Record<K, V> getNewestRecord() throws ChangelogException
  Record<K, V> getNewestRecord() throws ChangelogException
  {
    // TODO : need a more efficient way to retrieve it
    DBCursor<Record<K, V>> cursor = null;
@@ -597,45 +483,58 @@
  /** {@inheritDoc} */
  public void close()
  {
    exclusiveLock.lock();
    try
    if (isWriteEnabled)
    {
      if (isClosed)
      try
      {
        return;
        syncToFileSystem();
      }
      catch (ChangelogException e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      writer.close();
    }
    readerPool.shutdown();
  }
      if (isWriteEnabled)
      {
        try
        {
          syncToFileSystem();
        }
        catch (ChangelogException e)
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        writer.close();
      }
      readerPool.shutdown();
      isClosed = true;
    }
    finally
  /**
   * Delete this log file (file is physically removed). Should be called only
   * when log file is closed.
   *
   * @throws ChangelogException
   *            If log file can't be deleted.
   */
  void delete() throws ChangelogException
  {
    final boolean isDeleted = logfile.delete();
    if (!isDeleted)
    {
      exclusiveLock.unlock();
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
    }
  }
  /**
   * Return the size of this log file in bytes.
   *
   * @return the size of log file
   */
  long getSizeInBytes()
  {
    return writer.getBytesWritten();
  }
  /** The path of this log file as a String. */
  private String getPath()
  {
    return logfile.getPath();
  }
  /** Read a record from the provided reader. */
  private Record<K,V> readRecord(final RandomAccessFile reader) throws ChangelogException
  {
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return null;
      }
      final ByteString recordData = readEncodedRecord(reader);
      return recordData != null ? parser.decodeRecord(recordData) : null;
    }
@@ -643,10 +542,6 @@
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
    }
    finally
    {
      sharedLock.unlock();
    }
  }
  private ByteString readEncodedRecord(final RandomAccessFile reader) throws ChangelogException
@@ -672,26 +567,17 @@
    }
  }
  /** Seek to provided position on the provided reader. */
  /** Seek to given position on the provided reader. */
  private void seek(RandomAccessFile reader, long position) throws ChangelogException
  {
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      reader.seek(position);
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(position, logfile.getPath()), e);
    }
    finally
    {
      sharedLock.unlock();
    }
  }
  /**
@@ -706,66 +592,42 @@
  /** Release the provided reader. */
  private void releaseReader(RandomAccessFile reader) {
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      readerPool.release(reader);
    }
    finally
    {
      sharedLock.unlock();
    }
    readerPool.release(reader);
  }
  /**
   * A cursor on the log.
   */
  static interface LogCursor<K extends Comparable<K>,V> extends DBCursor<Record<K, V>>
  /** {@inheritDoc} */
  @Override
  public int hashCode()
  {
    /**
     * Position the cursor to the record corresponding to the provided key or to
     * the nearest key (the lowest key higher than the provided key).
     * <p>
     * The record is only searched forward. To search backward, it is first
     * necessary to call the {@code rewind()} method to start from beginning of
     * log file.
     *
     * @param key
     *          Key to use as a start position for the cursor. If key is
     *          {@code null}, use the key of the first record instead.
     * @param findNearest
     *          If {@code true}, start position is the lowest key that is higher
     *          than the provided key, otherwise start position is the provided
     *          key.
     * @return {@code true} if cursor is successfully positionned to the key or
     *         the the nearest key, {@code false} otherwise.
     * @throws ChangelogException
     *           If an error occurs when positioning cursor.
     */
    boolean positionTo(K key, boolean findNearest) throws ChangelogException;
    return logfile.hashCode();
  }
    /**
     * Rewind the cursor, positioning it to the beginning of the log file,
     * pointing to no record initially.
     *
     * @throws ChangelogException
     *          If an error occurs when rewinding to zero.
     */
    void rewind() throws ChangelogException;
  /** {@inheritDoc} */
  @Override
  public boolean equals(Object that)
  {
    if (this == that)
    {
      return true;
    }
    if (!(that instanceof LogFile))
    {
      return false;
    }
    final LogFile<?, ?> other = (LogFile<?, ?>) that;
    return logfile.equals(other.logfile);
  }
  /**
   * Implements a cursor on the log.
   * Implements a repositionable cursor on the log file.
   * <p>
   * The cursor initially points to a record, that is {@code cursor.getRecord()}
   * is equals to the first record available from the cursor before any call to
   * {@code cursor.next()} method.
   */
  private static final class LogFileCursor<K extends Comparable<K>, V> implements LogCursor<K,V>
  static final class LogFileCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
  {
    /** The underlying log on which entries are read. */
    private final LogFile<K, V> logFile;
@@ -784,7 +646,7 @@
     * @throws ChangelogException
     *           If an error occurs when creating the cursor.
     */
    LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException
    private LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException
    {
      this.logFile = logFile;
      this.reader = logFile.getReader();
@@ -800,17 +662,19 @@
      }
    }
    /** {@inheritDoc} */
    public String toString()
    /**
     * Creates a cursor on the provided log, initialised to the provided record and
     * pointing to the provided file position.
     * <p>
     * Note: there is no check to ensure that provided record and file position are
     * consistent. It is the responsability of the caller of this method.
     */
    private LogFileCursor(LogFile<K, V> logFile, Record<K, V> record, long filePosition) throws ChangelogException
    {
      return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord);
    }
    /** {@inheritDoc} */
    @Override
    public Record<K,V> getRecord()
    {
      return currentRecord;
      this.logFile = logFile;
      this.reader = logFile.getReader();
      this.currentRecord = record;
      logFile.seek(reader, filePosition);
    }
    /** {@inheritDoc} */
@@ -823,6 +687,13 @@
    /** {@inheritDoc} */
    @Override
    public Record<K,V> getRecord()
    {
      return currentRecord;
    }
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(final K key, boolean findNearest) throws ChangelogException {
      do
      {
@@ -848,64 +719,35 @@
    /** {@inheritDoc} */
    @Override
    public void rewind() throws ChangelogException
    {
      logFile.seek(reader, 0);
      currentRecord = null;
    }
    /** {@inheritDoc} */
    @Override
    public void close()
    {
      logFile.releaseReader(reader);
    }
  }
  /** An empty cursor, that always return null records and false to {@code next()} method. */
  static final class EmptyLogCursor<K extends Comparable<K>, V> implements LogCursor<K,V>
  {
    /** {@inheritDoc} */
    @Override
    public Record<K,V> getRecord()
    /**
     * Returns the file position this cursor is pointing at.
     *
     * @return the position of reader on the log file
     * @throws ChangelogException
     *          If an error occurs.
     */
    long getFilePosition() throws ChangelogException
    {
      return null;
      try
      {
        return reader.getFilePointer();
      }
      catch (IOException e)
      {
        throw new ChangelogException(
            ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(logFile.getPath()), e);
      }
    }
    /** {@inheritDoc} */
    @Override
    public boolean next()
    {
      return false;
    }
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(K key, boolean returnLowestHigher) throws ChangelogException
    {
      return false;
    }
    /** {@inheritDoc} */
    @Override
    public void rewind() throws ChangelogException
    {
      // nothing to do
    }
    /** {@inheritDoc} */
    @Override
    public void close()
    {
      // nothing to do
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      return "EmptyLogCursor";
      return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord);
    }
  }
}
opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
@@ -25,18 +25,14 @@
 */
package org.opends.server.replication.server.changelog.file;
import static org.opends.server.loggers.debug.DebugLogger.*;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.SyncFailedException;
import java.util.HashMap;
import java.util.Map;
import org.opends.server.loggers.MeteredStream;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.ByteString;
import org.opends.server.util.StaticUtils;
@@ -45,80 +41,38 @@
/**
 * A writer on a log file.
 * <p>
 * The writer is cached in order to have a single writer per file in the JVM.
 */
class LogWriter extends OutputStream
{
  /** The cache of log writers. There is a single writer per file in the JVM.  */
  private static final Map<File, LogWriter> logWritersCache = new HashMap<File, LogWriter>();
  /** The exclusive lock used to acquire or close a log writer. */
  private static final Object lock = new Object();
  /** The file to write in. */
  private final File file;
  /** The stream to write data in the file. */
  private final BufferedOutputStream stream;
  /** The stream to write data in the file, capable of counting bytes written. */
  private final MeteredStream stream;
  /** The file descriptor on the file. */
  private final FileDescriptor fileDescriptor;
  /** The number of references on this writer. */
  private int referenceCount;
  /**
   * Creates a writer on the provided file.
   *
   * @param file
   *          The file to write.
   * @param stream
   *          The stream to write in the file.
   * @param fileDescriptor
   *          The descriptor on the file.
   * @throws ChangelogException
   *            If a problem occurs at creation.
   */
  private LogWriter(final File file, BufferedOutputStream stream, FileDescriptor fileDescriptor)
      throws ChangelogException
  public LogWriter(final File file) throws ChangelogException
  {
    this.file = file;
    this.stream = stream;
    this.fileDescriptor = fileDescriptor;
    this.referenceCount = 1;
  }
  /**
   * Returns a log writer on the provided file, creating it if necessary.
   *
   * @param file
   *            The log file to write in.
   * @return the log writer
   * @throws ChangelogException
   *            If a problem occurs.
   */
  public static LogWriter acquireWriter(File file) throws ChangelogException
  {
    synchronized (lock)
    try
    {
      LogWriter logWriter = logWritersCache.get(file);
      if (logWriter == null)
      {
        try
        {
          final FileOutputStream stream = new FileOutputStream(file, true);
          logWriter = new LogWriter(file, new BufferedOutputStream(stream), stream.getFD());
        }
        catch (Exception e)
        {
          throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_LOG_FILE.get(file.getPath()));
        }
        logWritersCache.put(file, logWriter);
      }
      else
      {
        logWriter.incrementRefCounter();
      }
      return logWriter;
      FileOutputStream fos = new FileOutputStream(file, true);
      this.stream = new MeteredStream(fos, file.length());
      this.fileDescriptor = fos.getFD();
    }
    catch (Exception e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_LOG_FILE.get(file.getPath()));
    }
  }
@@ -157,11 +111,14 @@
    bs.copyTo(stream);
  }
  /** {@inheritDoc} */
  @Override
  public void flush() throws IOException
  /**
   * Returns the number of bytes written in the underlying file.
   *
   * @return the number of bytes
   */
  public long getBytesWritten()
  {
    stream.flush();
    return stream.getBytesWritten();
  }
  /**
@@ -178,32 +135,7 @@
  @Override
  public void close()
  {
    synchronized (lock)
    {
      LogWriter writer = logWritersCache.get(file);
      if (writer == null)
      {
        // writer is already closed
        return;
      }
      // counter == 0 should never happen
      if (referenceCount == 0 || referenceCount == 1)
      {
        StaticUtils.close(stream);
        logWritersCache.remove(file);
        referenceCount = 0;
      }
      else
      {
        referenceCount--;
      }
    }
    StaticUtils.close(stream);
  }
  private void incrementRefCounter()
  {
    referenceCount++;
  }
}
opends/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java
@@ -25,6 +25,7 @@
 */
package org.opends.server.replication.server.changelog.file;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.ByteString;
/**
@@ -40,7 +41,6 @@
 */
interface RecordParser<K, V>
{
  /**
   * Decode a record from the provided byte array.
   * <p>
@@ -57,16 +57,45 @@
  Record<K, V> decodeRecord(ByteString data) throws DecodingException;
  /**
   * Encode the provided key and value to a byte array.
   * Encode the provided record to a byte array.
   * <p>
   * The returned array is intended to be stored as provided in the log file.
   *
   * @param key
   *          The key of the record.
   * @param value
   *          The value of the record.
   * @param record
   *          The record to encode.
   * @return the bytes array representing the (key,value) record
   */
  ByteString encodeRecord(K key, V value);
  ByteString encodeRecord(Record<K, V> record);
  /**
   * Read the key from the provided string.
   *
   * @param key
   *          The string representation of key, suitable for use in a filename,
   *          as written by the {@code encodeKeyToString()} method.
   * @return the key
   * @throws ChangelogException
   *           If key can't be read from the string.
   */
  K decodeKeyFromString(String key) throws ChangelogException;
  /**
   * Returns the provided key as a string that is suitable to be used in a
   * filename.
   *
   * @param key
   *          The key of a record.
   * @return a string encoding the key, unambiguously decodable to the original
   *         key, and suitable for use in a filename. The string should contain
   *         only ASCII characters and no space.
   */
  String encodeKeyToString(K key);
  /**
   * Returns a key that is guaranted to be always higher than any other key.
   *
   * @return the highest possible key
   */
  K getMaxKey();
}
opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -55,9 +55,7 @@
import org.opends.server.types.DirectoryException;
import org.opends.server.util.StaticUtils;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import static org.opends.messages.ReplicationMessages.*;
/**
@@ -82,7 +80,9 @@
 * id of the server. Each directory contains the log files for the given server
 * id.</li>
 * </ul>
 * All log files end with the ".log" suffix.
 * All log files end with the ".log" suffix. Log files always include the "head.log"
 * file and optionally zero to many read-only log files named after the lowest key
 * and highest key present in the log file.
 * <p>
 * Layout example with two domains "o=test1" and "o=test2", each having server
 * ids 22 and 33 :
@@ -91,25 +91,29 @@
 * +---changelog
 * |   \---domains.state  [contains mapping: 1 => "o=test1", 2 => "o=test2"]
 * |   \---changenumberindex
 * |      \--- current.log
 * |      \--- head.log [contains last records written]
 * |      \--- 1_50.log [contains records with keys in interval [1, 50]]
 * |   \---1.domain
 * |       \---generation1.id
 * |       \---22.server
 * |           \---current.log
 * |           \---head.log
 * |       \---33.server
 * |           \---current.log
 * |           \---head.log
 * |   \---2.domain
 * |       \---generation1.id
 * |       \---22.server
 * |           \---current.log
 * |           \---head.log
 * |       \---33.server
 * |           \---current.log
 * |           \---head.log
 * </pre>
 */
class ReplicationEnvironment
{
  private static final DebugTracer TRACER = getTracer();
  // TODO : to replace by configurable value
  private static final long MAX_LOG_FILE_SIZE_IN_BYTES = 10*1024;
  private static final int NO_GENERATION_ID = -1;
  private static final String CN_INDEX_DB_DIRNAME = "changenumberindex";
@@ -159,7 +163,7 @@
  private final String replicationRootPath;
  /** The list of logs that are in use. */
  private final List<LogFile<?, ?>> logs = new CopyOnWriteArrayList<LogFile<?, ?>>();
  private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>();
  /** Maps each domain DN to a domain id that is used to name directory in file system. */
  private final Map<DN, String> domains = new HashMap<DN, String>();
@@ -228,7 +232,7 @@
   * @throws ChangelogException
   *           if an error occurs.
   */
  LogFile<CSN, UpdateMsg> getOrCreateReplicaDB(final DN domainDN, final int serverId, final long generationId)
  Log<CSN, UpdateMsg> getOrCreateReplicaDB(final DN domainDN, final int serverId, final long generationId)
      throws ChangelogException
  {
    if (debugEnabled())
@@ -276,7 +280,7 @@
   * @throws ChangelogException
   *           when a problem occurs.
   */
  LogFile<Long, ChangeNumberIndexRecord> getOrCreateCNIndexDB() throws ChangelogException
  Log<Long, ChangeNumberIndexRecord> getOrCreateCNIndexDB() throws ChangelogException
  {
    final File path = getCNIndexDBPath();
    try
@@ -305,24 +309,6 @@
  }
  /**
   * Clears the content of replication database.
   *
   * @param log
   *          The log to clear.
   */
  void clearDB(final LogFile<?, ?> log)
  {
    try
    {
      log.clear();
    }
    catch (ChangelogException e)
    {
      logError(ERR_ERROR_CLEARING_DB.get(log.getName(), stackTraceToSingleLineString(e)));
    }
  }
  /**
   * Clears the generated id associated to the provided domain DN from the state
   * Db.
   * <p>
@@ -506,12 +492,12 @@
  }
  /** Open a log from the provided path and record parser. */
  private <K extends Comparable<K>, V> LogFile<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser)
  private <K extends Comparable<K>, V> Log<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser)
      throws ChangelogException
  {
    checkShutDownBeforeOpening(serverIdPath);
    final LogFile<K, V> log = LogFile.newAppendableLogFile(serverIdPath, parser);
    final Log<K, V> log = Log.openLog(serverIdPath, parser, MAX_LOG_FILE_SIZE_IN_BYTES);
    checkShutDownAfterOpening(serverIdPath, log);
@@ -519,11 +505,11 @@
    return log;
  }
  private void checkShutDownAfterOpening(final File serverIdPath, final LogFile<?, ?> log) throws ChangelogException
  private void checkShutDownAfterOpening(final File serverIdPath, final Log<?, ?> log) throws ChangelogException
  {
    if (isShuttingDown.get())
    {
      closeDB(log);
      closeLog(log);
      throw new ChangelogException(WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get(serverIdPath.getPath(),
          replicationServer.getServerId()));
    }
@@ -590,7 +576,7 @@
    return new File(replicationRootPath, CN_INDEX_DB_DIRNAME);
  }
  private void closeDB(final LogFile<?, ?> log)
  private void closeLog(final Log<?, ?> log)
  {
    logs.remove(log);
    log.close();
opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -1101,6 +1101,13 @@
    return new File(testResourceDir, filename);
  }
  public static File getUnitTestRootPath()
  {
    final String buildRoot = System.getProperty(PROPERTY_BUILD_ROOT);
    final String path = System.getProperty(PROPERTY_BUILD_DIR, buildRoot + File.separator + "build");
    return new File(path, "unit-tests");
  }
  /**
   * Prevent instantiation.
   */
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ChangeNumberControlPluginTestCase.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS
 *      Portions Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication;
@@ -79,6 +79,7 @@
        + "cn: Replication Server\n"
        + "ds-cfg-replication-port: " + replServerPort + "\n"
        + "ds-cfg-replication-db-directory: ChangeNumberControlDbTest\n"
        + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n"
        + "ds-cfg-replication-server-id: 103\n";
    // suffix synchronized
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2007-2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS
 *      Portions Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication;
@@ -100,6 +100,7 @@
        + "cn: Replication Server\n"
        + "ds-cfg-replication-port:" + replServerPort + "\n"
        + "ds-cfg-replication-db-directory: ReSyncTest\n"
        + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n"
        + "ds-cfg-replication-server-id: 104\n";
    // suffix synchronized
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.plugin;
@@ -81,6 +81,7 @@
         + "cn: replication Server\n"
         + "ds-cfg-replication-port: " + replServerPort + "\n"
         + "ds-cfg-replication-db-directory: HistoricalTest\n"
         + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n"
         + "ds-cfg-replication-server-id: 102\n";
    // The suffix to be synchronized.
@@ -489,7 +490,7 @@
    addEntriesWithHistorical(1, entryCnt);
    // leave a little delay between adding/modifying test entries
    // and configuring the purge delay.
    // and configuring the purge delay.
    Thread.sleep(10);
    // set the purge delay to 1 minute
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
@@ -80,7 +80,7 @@
  {
    RecordParser<Long, ChangeNumberIndexRecord> parser = FileChangeNumberIndexDB.RECORD_PARSER;
    ByteString data = parser.encodeRecord(msg.getChangeNumber(), msg);
    ByteString data = parser.encodeRecord(Record.from(msg.getChangeNumber(), msg));
    Record<Long, ChangeNumberIndexRecord> record = parser.decodeRecord(data);
    assertThat(record).isNotNull();
@@ -169,7 +169,10 @@
   * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
   * </ol>
   */
  // TODO :: enable when purge is implemented with multi-files log
  // TODO : this works only if we ensure that there is a rotation of ahead log file
  // at the right place. First two records are 37 and 76 bytes long,
  // so it means : 37 < max file size < 113 to have the last record alone in the ahead log file
  // Re-enable this test when max file size is customizable for log
  @Test(enabled=false)
  public void testPurge() throws Exception
  {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
@@ -95,7 +95,7 @@
  {
    RecordParser<CSN, UpdateMsg> parser = FileReplicaDB.RECORD_PARSER;
    ByteString data = parser.encodeRecord(msg.getCSN(), msg);
    ByteString data = parser.encodeRecord(Record.from(msg.getCSN(), msg));
    Record<CSN, UpdateMsg> record = parser.decodeRecord(data);
    assertThat(record).isNotNull();
@@ -264,7 +264,10 @@
    }
  }
  // TODO : enable when purge is enabled with multi-files log implementation
  // TODO : this works only if we ensure that there is a rotation of ahead log file
  // at right place. Each record takes 54 bytes, so it means : 108 < max file size < 162 to have
  // the last record alone in the ahead log file
  // Re-enable this test when max file size is customizable for log
  @Test(enabled=false)
  public void testPurge() throws Exception
  {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -35,7 +35,8 @@
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -45,11 +46,11 @@
@Test(sequential=true)
public class LogFileTest extends DirectoryServerTestCase
{
  private static final String TEST_DIRECTORY_CHANGELOG = "test-output/changelog";
  private static final String TEST_DIRECTORY_CHANGELOG = "test-output" + File.separator + "changelog";
  private static final StringRecordParser RECORD_PARSER = new StringRecordParser();
  static final StringRecordParser RECORD_PARSER = new StringRecordParser();
  private static final RecordParser<String,String> RECORD_PARSER_FAILING_TO_READ = new StringRecordParser() {
  static final RecordParser<String,String> RECORD_PARSER_FAILING_TO_READ = new StringRecordParser() {
      @Override
      public Record<String, String> decodeRecord(ByteString data) throws DecodingException
      {
@@ -57,11 +58,18 @@
      }
  };
  @BeforeClass
  public void createTestDirectory()
  {
    File logDir = new File(TEST_DIRECTORY_CHANGELOG);
    logDir.mkdirs();
  }
  @BeforeMethod
  /** Create a new log file with ten records starting from (key1, value1) until (key10, value10). */
  public void initialize() throws Exception
  {
    File theLogFile = new File(TEST_DIRECTORY_CHANGELOG, LogFile.LOG_FILE_NAME);
    File theLogFile = new File(TEST_DIRECTORY_CHANGELOG, Log.HEAD_LOG_FILE_NAME);
    if (theLogFile.exists())
    {
      theLogFile.delete();
@@ -70,12 +78,12 @@
    for (int i = 1; i <= 10; i++)
    {
      logFile.addRecord("key"+i, "value"+i);
      logFile.append(Record.from("key"+i, "value"+i));
    }
    logFile.close();
  }
  @AfterMethod
  @AfterClass
  public void cleanTestChangelogDirectory()
  {
    final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
@@ -87,8 +95,7 @@
  private LogFile<String, String> getLogFile(RecordParser<String, String> parser) throws ChangelogException
  {
    LogFile<String, String> logFile = LogFile.newAppendableLogFile(new File(TEST_DIRECTORY_CHANGELOG), parser);
    return logFile;
    return LogFile.newAppendableLogFile(new File(TEST_DIRECTORY_CHANGELOG, Log.HEAD_LOG_FILE_NAME), parser);
  }
  @Test
@@ -266,7 +273,7 @@
      for (int i = 1; i <= 100; i++)
      {
        Record<String, String> record = Record.from("newkey" + i, "newvalue" + i);
        writeLog.addRecord(record);
        writeLog.append(record);
        assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
        assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key1", "value1"));
        assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record);
@@ -279,35 +286,6 @@
    }
  }
  @Test()
  public void testTwoConcurrentWrite() throws Exception
  {
    final LogFile<String, String> writeLog1 = getLogFile(RECORD_PARSER);
    final LogFile<String, String> writeLog2 = getLogFile(RECORD_PARSER);
    try
    {
      writeLog1.addRecord(Record.from("startkey", "startvalue"));
      Thread write1 = getWriteLogThread(writeLog1, "a");
      Thread write2 = getWriteLogThread(writeLog2, "b");
      write1.run();
      write2.run();
      write1.join();
      write2.join();
      writeLog1.syncToFileSystem();
      DBCursor<Record<String, String>> cursor = writeLog1.getCursor("startkey");
      for (int i = 1; i <= 200; i++)
      {
         assertThat(cursor.next()).isTrue();
      }
      assertThat(cursor.getRecord()).isIn(Record.from("k-b100", "v-b100"), Record.from("k-a100", "v-a100"));
    }
    finally
    {
      StaticUtils.close(writeLog1, writeLog2);
    }
  }
  /**
   * Read the cursor until exhaustion, ensuring that its first value is fromIndex and its last value
   * endIndex, using (keyN, valueN) where N is the index.
@@ -320,30 +298,13 @@
      assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key" + i, "value" + i));
    }
    assertThat(cursor.next()).isFalse();
    assertThat(cursor.getRecord()).isNull();
    assertThatCursorIsExhausted(cursor);
  }
  /** Returns a thread that write 100 records to the provided log. */
  private Thread getWriteLogThread(final LogFile<String, String> writeLog, final String recordPrefix)
  private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception
  {
    return new Thread() {
      public void run()
      {
        for (int i = 1; i <= 100; i++)
        {
          Record<String, String> record = Record.from("k-" + recordPrefix + i, "v-" + recordPrefix + i);
          try
          {
            writeLog.addRecord(record);
          }
          catch (ChangelogException e)
          {
            e.printStackTrace();
          }
        }
      }
    };
    assertThat(cursor.next()).isFalse();
    assertThat(cursor.getRecord()).isNull();
  }
  /**
@@ -374,11 +335,31 @@
      return length;
    }
    public ByteString encodeRecord(String key, String value)
    public ByteString encodeRecord(Record<String, String> record)
    {
      return new ByteStringBuilder()
        .append(key).append(STRING_SEPARATOR)
        .append(value).append(STRING_SEPARATOR).toByteString();
        .append(record.getKey()).append(STRING_SEPARATOR)
        .append(record.getValue()).append(STRING_SEPARATOR).toByteString();
    }
    @Override
    public String decodeKeyFromString(String key) throws ChangelogException
    {
      return key;
    }
    @Override
    public String encodeKeyToString(String key)
    {
      return key;
    }
    /** {@inheritDoc} */
    @Override
    public String getMaxKey()
    {
      // '~' character has the highest ASCII value
      return "~~~~";
    }
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
New file
@@ -0,0 +1,571 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.replication.server.changelog.file.LogFileTest.*;
import java.io.File;
import java.util.concurrent.atomic.AtomicReference;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@SuppressWarnings("javadoc")
@Test(sequential=true)
public class LogTest extends DirectoryServerTestCase
{
  // Use a directory dedicated to this test class
  private static final File LOG_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
  @BeforeMethod
  public void initialize() throws Exception
  {
    // Delete any previous log
    if (LOG_DIRECTORY.exists())
    {
      StaticUtils.recursiveDelete(LOG_DIRECTORY);
    }
    // Build a log with 10 records with String keys and String values
    // Keys are using the format keyNNN where N is a figure
    // You should always ensure keys are correctly ordered otherwise tests may break unexpectedly
    Log<String, String> log = openLog(RECORD_PARSER);
    for (int i = 1; i <= 10; i++)
    {
      log.append(Record.from(String.format("key%03d", i), "value" + i));
    }
    log.close();
  }
  private Log<String, String> openLog(RecordParser<String, String> parser) throws ChangelogException
  {
    // Each string record has a length of approximately 18 bytes
    // This size is set in order to have 2 records per log file before the rotation happens
    // This allow to ensure rotation mechanism is thoroughly tested
    // Some tests rely on having 2 records per log file (especially the purge tests), so take care
    // if this value has to be changed
    int sizeLimitPerFileInBytes = 30;
    return Log.openLog(LOG_DIRECTORY, parser, sizeLimitPerFileInBytes);
  }
  @Test
  public void testCursor() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = log.getCursor();
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
      assertThatCursorCanBeFullyRead(cursor, 2, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @Test
  public void testCursorWhenGivenAnExistingKey() throws Exception
  {
    Log<String, String> log = openLog(RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = log.getCursor("key005");
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key005", "value5"));
      assertThatCursorCanBeFullyRead(cursor, 6, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @Test
  public void testCursorWhenGivenAnUnexistingKey() throws Exception
  {
    Log<String, String> log = openLog(RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      // key is between key005 and key006
      cursor = log.getCursor("key005000");
      assertThat(cursor).isNotNull();
      assertThat(cursor.getRecord()).isNull();
      assertThat(cursor.next()).isFalse();
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @Test
  public void testCursorWhenGivenANullKey() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = log.getCursor(null);
      // should start from first record
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
      assertThatCursorCanBeFullyRead(cursor, 2, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @Test
  public void testNearestCursorWhenGivenAnExistingKey() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null;
    try {
      // this key is the first key of the log file "key1_key2.log"
      cursor1 = log.getNearestCursor("key001");
      // lowest higher key is key2
      assertThat(cursor1.getRecord()).isEqualTo(Record.from("key002", "value2"));
      assertThatCursorCanBeFullyRead(cursor1, 3, 10);
      // this key is the last key of the log file "key3_key4.log"
      cursor2 = log.getNearestCursor("key004");
      // lowest higher key is key5
      assertThat(cursor2.getRecord()).isEqualTo(Record.from("key005", "value5"));
      assertThatCursorCanBeFullyRead(cursor2, 6, 10);
      cursor3 = log.getNearestCursor("key009");
      // lowest higher key is key10
      assertThat(cursor3.getRecord()).isEqualTo(Record.from("key010", "value10"));
      assertThatCursorIsExhausted(cursor3);
    }
    finally {
      StaticUtils.close(cursor1, cursor2, cursor3, log);
    }
  }
  @Test
  public void testNearestCursorWhenGivenAnExistingKey_KeyIsTheLastOne() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = log.getNearestCursor("key010");
      // lowest higher key does not exist
      assertThatCursorIsExhausted(cursor);
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @Test
  public void testNearestCursorWhenGivenAnUnexistingKey() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      // key is between key005 and key006
      cursor = log.getNearestCursor("key005000");
      // lowest higher key is key006
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key006", "value6"));
      assertThatCursorCanBeFullyRead(cursor, 7, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @Test
  public void testNearestCursorWhenGivenANullKey() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = log.getNearestCursor(null);
      // should start from start
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
      assertThatCursorCanBeFullyRead(cursor, 2, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @Test(expectedExceptions=ChangelogException.class)
  public void testCursorWhenParserFailsToRead() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER_FAILING_TO_READ);
    try {
      log.getCursor("key");
    }
    finally {
      StaticUtils.close(log);
    }
  }
  @Test
  public void testGetOldestRecord() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    try
    {
      Record<String, String> record = log.getOldestRecord();
      assertThat(record).isEqualTo(Record.from("key001", "value1"));
    }
    finally {
      StaticUtils.close(log);
    }
  }
  @Test
  public void testGetNewestRecord() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    try
    {
      Record<String, String> record = log.getNewestRecord();
      assertThat(record).isEqualTo(Record.from("key010", "value10"));
    }
    finally {
      StaticUtils.close(log);
    }
  }
  /**
   * Test that changes are visible immediately to a reader after a write.
   */
  @Test
  public void testWriteAndReadOnSameLog() throws Exception
  {
    Log<String, String> writeLog = null;
    Log<String, String> readLog = null;
    try
    {
      writeLog = openLog(LogFileTest.RECORD_PARSER);
      readLog = openLog(LogFileTest.RECORD_PARSER);
      for (int i = 1; i <= 10; i++)
      {
        Record<String, String> record = Record.from(String.format("nkey%03d", i), "nvalue" + i);
        writeLog.append(record);
        assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
        assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key001", "value1"));
        assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record);
        assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key001", "value1"));
      }
    }
    finally
    {
      StaticUtils.close(writeLog, readLog);
    }
  }
  @Test
  public void testTwoConcurrentWrite() throws Exception
  {
    Log<String, String> writeLog1 = null;
    Log<String, String> writeLog2 = null;
    DBCursor<Record<String, String>> cursor = null;
    try
    {
      writeLog1 = openLog(LogFileTest.RECORD_PARSER);
      writeLog2 = openLog(LogFileTest.RECORD_PARSER);
      writeLog1.append(Record.from("key020", "starting record"));
      AtomicReference<ChangelogException> exceptionRef = new AtomicReference<ChangelogException>();
      Thread write1 = getWriteLogThread(writeLog1, "a", exceptionRef);
      Thread write2 = getWriteLogThread(writeLog2, "b", exceptionRef);
      write1.run();
      write2.run();
      write1.join();
      write2.join();
      if (exceptionRef.get() != null)
      {
        throw exceptionRef.get();
      }
      writeLog1.syncToFileSystem();
      cursor = writeLog1.getCursor("key020");
      for (int i = 1; i <= 60; i++)
      {
         assertThat(cursor.next()).isTrue();
      }
      assertThat(cursor.getRecord()).isIn(Record.from("nkb030", "vb30"), Record.from("nka030", "va30"));
    }
    finally
    {
      StaticUtils.close(cursor, writeLog1, writeLog2);
    }
  }
  /**
   *  This test should be disabled.
   *  Enable it locally when you need to have an rough idea of write performance.
   */
  @Test(enabled=false)
  public void logWriteSpeed() throws Exception
  {
    Log<String, String> writeLog = null;
    try
    {
      long sizeOf1MB = 1024*1024;
      writeLog = Log.openLog(LOG_DIRECTORY, LogFileTest.RECORD_PARSER, sizeOf1MB);
      for (int i = 1; i < 1000000; i++)
      {
        writeLog.append(Record.from(String.format("key%010d", i), "value" + i));
      }
    }
    finally
    {
      StaticUtils.close(writeLog);
    }
  }
  @Test
  public void testWriteWhenCursorIsOpenedAndAheadLogFileIsRotated() throws Exception
  {
    DBCursor<Record<String, String>> cursor = null;
    Log<String, String> log = null;
    try
    {
      log = openLog(LogFileTest.RECORD_PARSER);
      cursor = log.getCursor();
      // advance cursor to last record to ensure it is pointing to ahead log file
      advanceCursorFromFirstRecordTo(cursor, 10);
      // add new records to ensure the ahead log file is rotated
      for (int i = 11; i <= 20; i++)
      {
        log.append(Record.from(String.format("key%03d", i), "value" + i));
      }
      // check that cursor can fully read the new records
      assertThatCursorCanBeFullyRead(cursor, 11, 20);
    }
    finally
    {
      StaticUtils.close(cursor, log);
    }
  }
  @Test
  public void testWriteWhenMultiplesCursorsAreOpenedAndAheadLogFileIsRotated() throws Exception
  {
    DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null, cursor4 = null;
    Log<String, String> log = null;
    try
    {
      log = openLog(LogFileTest.RECORD_PARSER);
      cursor1 = log.getCursor();
      advanceCursorFromFirstRecordTo(cursor1, 1);
      cursor2 = log.getCursor();
      advanceCursorFromFirstRecordTo(cursor2, 4);
      cursor3 = log.getCursor();
      advanceCursorFromFirstRecordTo(cursor3, 9);
      cursor4 = log.getCursor();
      advanceCursorFromFirstRecordTo(cursor4, 10);
      // add new records to ensure the ahead log file is rotated
      for (int i = 11; i <= 20; i++)
      {
        log.append(Record.from(String.format("key%03d", i), "value" + i));
      }
      // check that cursors can fully read the new records
      assertThatCursorCanBeFullyRead(cursor1, 2, 20);
      assertThatCursorCanBeFullyRead(cursor2, 5, 20);
      assertThatCursorCanBeFullyRead(cursor3, 10, 20);
      assertThatCursorCanBeFullyRead(cursor4, 11, 20);
    }
    finally
    {
      StaticUtils.close(cursor1, cursor2, cursor3, cursor4, log);
    }
  }
  @Test
  public void testClear() throws Exception
  {
    DBCursor<Record<String, String>> cursor = null;
    Log<String, String> log = null;
    try
    {
      log = openLog(LogFileTest.RECORD_PARSER);
      log.clear();
      cursor = log.getCursor();
      assertThatCursorIsExhausted(cursor);
    }
    finally
    {
      StaticUtils.close(cursor, log);
    }
  }
  // TODO : Should be re-enabled once the issue with robot functional test replication/totalupdate.txt is solved
  @Test(enabled=false, expectedExceptions=ChangelogException.class)
  public void testClearWhenCursorIsOpened() throws Exception
  {
    DBCursor<Record<String, String>> cursor = null;
    Log<String, String> log = null;
    try
    {
      log = openLog(LogFileTest.RECORD_PARSER);
      cursor = log.getCursor();
      log.clear();
    }
    finally
    {
      StaticUtils.close(cursor, log);
    }
  }
  @DataProvider(name = "purgeKeys")
  Object[][] purgeKeys()
  {
    // purge key, first record expected in the cursor, startIndex + endIndex to fully read the cursor
    return new Object[][]
    {
      // lowest key of the read-only log file "key005_key006.log"
      { "key005", Record.from("key005", "value5"), 6, 10},
      // key that is not the lowest of the read-only log file "key005_key006.log"
      { "key006", Record.from("key005", "value5"), 6, 10},
      // lowest key of the ahead log file "ahead.log"
      { "key009", Record.from("key009", "value9"), 10, 10},
      // key that is not the lowest of the ahead log file "ahead.log"
      { "key010", Record.from("key009", "value9"), 10, 10},
      // key not present in log, which is between key005 and key006
      { "key005a", Record.from("key005", "value5"), 6, 10},
      // key not present in log, which is between key006 and key007
      { "key006a", Record.from("key007", "value7"), 8, 10},
      // key not present in log, which is lower than oldest key key001
      { "key000", Record.from("key001", "value1"), 2, 10},
      // key not present in log, which is higher than newest key key010
      // should return the lowest key present in ahead log
      { "key011", Record.from("key009", "value9"), 10, 10},
    };
  }
  /**
   * Given a purge key, after purge is done, expects a new cursor to point on first record provided and
   * then to be fully read starting at provided start index and finishing at provided end index.
   */
  @Test(dataProvider="purgeKeys")
  public void testPurge(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge,
      int cursorStartIndex, int cursorEndIndex) throws Exception
  {
    Log<String, String> log = null;
    DBCursor<Record<String, String>> cursor = null;
    try
    {
      log = openLog(LogFileTest.RECORD_PARSER);
      log.purgeUpTo(purgeKey);
      cursor = log.getCursor();
      assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge);
      assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex);
    }
    finally
    {
      StaticUtils.close(cursor, log);
    }
  }
  private void advanceCursorFromFirstRecordTo(DBCursor<Record<String, String>> cursor, int endIndex)
      throws Exception
  {
    assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
    advanceCursorUpTo(cursor, 2, endIndex);
  }
  private void advanceCursorUpTo(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
      throws Exception
  {
    for (int i = fromIndex; i <= endIndex; i++)
    {
      assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
      assertThat(cursor.getRecord()).isEqualTo(Record.from(String.format("key%03d", i), "value" + i));
    }
  }
  /**
   * Read the cursor until exhaustion, ensuring that its first value is fromIndex and its last value
   * endIndex, using (keyN, valueN) where N is the index.
   */
  private void assertThatCursorCanBeFullyRead(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
      throws Exception
  {
    advanceCursorUpTo(cursor, fromIndex, endIndex);
    assertThatCursorIsExhausted(cursor);
  }
  private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception
  {
    assertThat(cursor.next()).isFalse();
    assertThat(cursor.getRecord()).isNull();
  }
  /** Returns a thread that write N records to the provided log. */
  private Thread getWriteLogThread(final Log<String, String> writeLog, final String recordPrefix,
      final AtomicReference<ChangelogException> exceptionRef)
  {
    return new Thread() {
      public void run()
      {
        for (int i = 1; i <= 30; i++)
        {
          Record<String, String> record = Record.from(
              String.format("nk%s%03d", recordPrefix, i), "v" + recordPrefix + i);
          try
          {
            writeLog.append(record);
          }
          catch (ChangelogException e)
          {
            // keep the first exception only
            exceptionRef.compareAndSet(null, e);
          }
        }
      }
    };
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
@@ -80,14 +80,15 @@
    final DN domainDN = DN.decode(DN1_AS_STRING);
    ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
    LogFile<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
    LogFile<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
    LogFile<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
    Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
    Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
    Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
    StaticUtils.close(cnDB, replicaDB, replicaDB2);
    ChangelogState state = environment.readChangelogState();
    assertThat(state.getDomainToServerIds()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(1, 2)));
    assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
    assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(1, 2);
    assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
  }
@@ -98,8 +99,8 @@
    List<DN> domainDNs = Arrays.asList(DN.decode(DN1_AS_STRING), DN.decode(DN2_AS_STRING), DN.decode(DN3_AS_STRING));
    ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
    LogFile<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
    List<LogFile<CSN,UpdateMsg>> replicaDBs = new ArrayList<LogFile<CSN,UpdateMsg>>();
    Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
    List<Log<CSN,UpdateMsg>> replicaDBs = new ArrayList<Log<CSN,UpdateMsg>>();
    for (int i = 0; i <= 2 ; i++)
    {
      for (int j = 1; j <= 10; j++)
@@ -129,8 +130,8 @@
    File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
    DN domainDN = DN.decode(DN1_AS_STRING);
    ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
    LogFile<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
    LogFile<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
    Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
    Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
    StaticUtils.close(replicaDB, replicaDB2);
    // delete the domain directory created for the 2 replica DBs to break the