mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
03.45.2014 2d735189c834108a2e5f7a795610372eb6d00aed
OPENDJ-1467 :  File Based Changelog must support replicas temporarily leaving the topology

* File-based changelog
Store the offline CSN when a replica goes offline
Read the offline CSN if present to build the changelog state at startup

Update ReplicationEnvironment.java to manage read and write from/to storage
Update ReplicationEnvironmentTest.java with more unit tests
Update replication.properties with new messages

* File and JE based changelog
Remove the offline CSN if present when receiving an heartbeat or an
update message

Update FileChangelogDB.java, JEChangelogDB.java, ReplicationDBEnv.java to
manage online replica notification

* Other minor updates : renaming, comments


10 files modified
522 ■■■■ changed files
opends/src/messages/messages/replication.properties 14 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java 6 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 17 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java 25 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java 177 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 15 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 17 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java 49 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java 200 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -572,8 +572,8 @@
 to file system for log file '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_SEEK_260=Could not seek to position %d for reader \
 on log file '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root directory '%s' for \
 log file
SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root \
 directory '%s' for log file
SEVERE_ERR_CHANGELOG_UNABLE_TO_DECODE_DN_FROM_DOMAIN_STATE_FILE_262=Could not decode DN \
 from domain state file '%s', from line '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_READ_DOMAIN_STATE_FILE_263=Could not read domain state \
@@ -605,3 +605,13 @@
 head log file from '%s' to '%s'
INFO_CHANGELOG_LOG_FILE_ROTATION_276=Rotation needed for log file '%s', \
 size of head log file is %d bytes
SEVERE_ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH_277=Could not add replica \
 offline for domain %s and server id %d because the path '%s' does not exist
SEVERE_ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE_278=Could not write offline \
 replica information for domain %s and server id %d, using path '%s' (offline CSN is %s)
SEVERE_ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE_279=Could not read replica offline \
 state file '%s' for domain %s, it should contain exactly one line corresponding to the offline CSN
SEVERE_ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE_280=Could not read content of \
 replica offline state file '%s' for domain %s
SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE_281=Could not delete replica \
 offline state file '%s' for domain %s and server id %d
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2507,7 +2507,7 @@
    {
      if (msg.isReplicaOfflineMsg())
      {
        domainDB.replicaOffline(baseDN, msg.getCSN());
        domainDB.notifyReplicaOffline(baseDN, msg.getCSN());
      }
      else
      {
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -166,8 +166,10 @@
   * @param heartbeatCSN
   *          The CSN heartbeat sent by this replica (contains the serverId and
   *          timestamp of the heartbeat)
   * @throws ChangelogException
   *            If a database problem happened
   */
  void replicaHeartbeat(DN baseDN, CSN heartbeatCSN);
  void replicaHeartbeat(DN baseDN, CSN heartbeatCSN) throws ChangelogException;
  /**
   * Let the DB know this replica is going down.
@@ -186,5 +188,5 @@
   * @throws ChangelogException
   *           If a database problem happened
   */
  void replicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException;
  void notifyReplicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException;
}
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -672,6 +672,7 @@
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
      indexer.publishUpdateMsg(baseDN, updateMsg);
    }
    return wasCreated;
@@ -679,25 +680,35 @@
  /** {@inheritDoc} */
  @Override
  public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN)
  public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException
  {
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId());
      indexer.publishHeartbeat(baseDN, heartbeatCSN);
    }
  }
  private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId)
      throws ChangelogException
  {
    if (indexer.isReplicaOffline(baseDN, serverId))
    {
      replicationEnv.notifyReplicaOnline(baseDN, serverId);
    }
  }
  /** {@inheritDoc} */
  @Override
  public void replicaOffline(final DN baseDN, final CSN offlineCSN)
  public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
  {
    replicationEnv.notifyReplicaOffline(baseDN, offlineCSN);
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      indexer.replicaOffline(baseDN, offlineCSN);
    }
    // TODO save this state in the changelogStateDB?
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -158,7 +158,7 @@
  /**
   * The last key appended to the log. In order to keep the ordering of the keys
   * in the log, any attempt to append a record with a key lower or equal to
   * this key will silently fail.
   * this is rejected (no error but an event is logged).
   */
  private K lastAppendedKey;
@@ -342,6 +342,10 @@
  /**
   * Add the provided record at the end of this log.
   * <p>
   * The record must have a key strictly higher than the key
   * of the last record added. If it is not the case, the record is not
   * appended and the method returns immediately.
   * <p>
   * In order to ensure that record is written out of buffers and persisted
   * to file system, it is necessary to explicitely call the
   * {@code syncToFileSystem()} method.
@@ -349,7 +353,7 @@
   * @param record
   *          The record to add.
   * @throws ChangelogException
   *           If the record can't be added to the log.
   *           If an error occurs while adding the record to the log.
   */
  public void append(final Record<K, V> record) throws ChangelogException
  {
@@ -766,7 +770,7 @@
    catch (IOException e)
    {
      throw new ChangelogException(
          ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(HEAD_LOG_FILE_NAME, rotatedLogFile.getPath()), e);
          ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(headLogFile.getPath(), rotatedLogFile.getPath()), e);
    }
  }
@@ -842,8 +846,8 @@
  private void openHeadLogFile() throws ChangelogException
  {
    final LogFile<K, V> head = LogFile.newAppendableLogFile(new File(logPath,  HEAD_LOG_FILE_NAME), recordParser);
    Record<K,V> newestRecord = head.getNewestRecord();
    lastAppendedKey = newestRecord == null ? null : newestRecord.getKey();
    final Record<K,V> newestRecord = head.getNewestRecord();
    lastAppendedKey = newestRecord != null ? newestRecord.getKey() : null;
    logFiles.put(recordParser.getMaxKey(), head);
  }
@@ -1011,9 +1015,9 @@
        if (key != null)
        {
          boolean isFound = currentCursor.positionTo(key, findNearest);
          if (isFound && getRecord() == null)
          if (isFound && getRecord() == null && !log.isHeadLogFile(currentLogFile))
          {
            // The key to position to may be in the next file, force the switch
            // The key to position is probably in the next file, force the switch
            isFound = next();
          }
          return isFound;
@@ -1047,6 +1051,13 @@
      currentLogFile = logFile;
      currentCursor = currentLogFile.getCursor();
    }
    /** {@inheritDoc} */
    public String toString()
    {
      return String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
          log.logPath, currentLogFile.getFile().getName(), currentCursor);
    }
  }
  /** An empty cursor, that always return null records and false to {@code next()} method. */
opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -30,10 +30,12 @@
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.util.HashMap;
import java.util.HashSet;
@@ -77,15 +79,21 @@
 * <ul>
 * <li>A "generation_[id].id" file, where [id] is the generation id</li>
 * <li>One directory per server id, named after "[id].server" where [id] is the
 * id of the server. Each directory contains the log files for the given server
 * id.</li>
 * id of the server.</li>
 * </ul>
 * All log files end with the ".log" suffix. Log files always include the "head.log"
 * file and optionally zero to many read-only log files named after the lowest key
 * and highest key present in the log file.
 * Each server id directory contains the following files :
 * <ul>
 * <li>The "head.log" file, which is the more recent log file where records are appended.</li>
 * <li>Zero to many read-only log files named after the lowest key
 * and highest key present in the log file (they all end with the ".log" suffix.</li>
 * <li>Optionally, a "offline.state" file that indicates that this particular server id
 *  of the domain is offline. This file contains the offline CSN, encoded as a String on a single line.</li>
 * </ul>
 * See {@code Log} class for details on the log files.
 *
 * <p>
 * Layout example with two domains "o=test1" and "o=test2", each having server
 * ids 22 and 33 :
 * ids 22 and 33, with server id 33 for domain "o=test1" being offline :
 *
 * <pre>
 * +---changelog
@@ -96,15 +104,16 @@
 * |   \---1.domain
 * |       \---generation1.id
 * |       \---22.server
 * |           \---head.log
 * |           \---head.log [contains last records written]
 * |       \---33.server
 * |           \---head.log
 * |           \---head.log [contains last records written]
 *             \---offline.state
 * |   \---2.domain
 * |       \---generation1.id
 * |       \---22.server
 * |           \---head.log
 * |           \---head.log [contains last records written]
 * |       \---33.server
 * |           \---head.log
 * |           \---head.log [contains last records written]
 * </pre>
 */
class ReplicationEnvironment
@@ -120,6 +129,8 @@
  private static final String DOMAINS_STATE_FILENAME = "domains.state";
  static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state";
  private static final String DOMAIN_STATE_SEPARATOR = ":";
  private static final String DOMAIN_SUFFIX = ".domain";
@@ -130,6 +141,8 @@
  private static final String GENERATION_ID_FILE_SUFFIX = ".id";
  private static final String UTF8_ENCODING = "UTF-8";
  private static final FileFilter DOMAIN_FILE_FILTER = new FileFilter()
  {
    @Override
@@ -169,7 +182,6 @@
  private final Map<DN, String> domains = new HashMap<DN, String>();
  /** Exclusive lock to guard the domains mapping and change of state to a domain.*/
  // TODO : review the usefulness of this lock
  private final Object domainLock = new Object();
  /** The underlying replication server. */
@@ -358,6 +370,77 @@
    }
  }
  /**
   * Notify that the replica corresponding to provided domain and provided CSN
   * is offline.
   *
   * @param domainDN
   *          the domain of the offline replica
   * @param offlineCSN
   *          the offline replica serverId and offline timestamp
   * @throws ChangelogException
   *           if a problem occurs
   */
  void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException
  {
    synchronized (domainLock)
    {
      final String domainId = domains.get(domainDN);
      final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId());
      if (!serverIdPath.exists())
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH.get(
            domainDN.toString(), offlineCSN.getServerId(), serverIdPath.getPath()));
      }
      final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
      Writer writer = null;
      try
      {
        // Overwrite file, only the last sent offline CSN is kept
        writer = newFileWriter(offlineFile);
        writer.write(offlineCSN.toString());
      }
      catch (IOException e)
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE.get(
            domainDN.toString(), offlineCSN.getServerId(), offlineFile.getPath(), offlineCSN.toString()), e);
      }
      finally
      {
        StaticUtils.close(writer);
      }
    }
  }
  /**
   * Notify that the replica corresponding to provided domain and server id
   * is online.
   *
   * @param domainDN
   *          the domain of the replica
   * @param serverId
   *          the replica serverId
   * @throws ChangelogException
   *           if a problem occurs
   */
  void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException
  {
    synchronized (domainLock)
    {
      final String domainId = domains.get(domainDN);
      final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME);
      if (offlineFile.exists())
      {
        final boolean isDeleted = offlineFile.delete();
        if (!isDeleted)
        {
          throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE.get(
              offlineFile.getPath(), domainDN.toString(), serverId));
        }
      }
    }
  }
  /** Reads the domain state file to find mapping between each domainDN and its associated domainId. */
  private void readDomainsStateFile() throws ChangelogException
  {
@@ -368,7 +451,7 @@
      String line = null;
      try
      {
        reader = new BufferedReader(new InputStreamReader(new FileInputStream(domainsStateFile), "UTF-8"));
        reader = newFileReader(domainsStateFile);
        while ((line = reader.readLine()) != null)
        {
          final int separatorPos = line.indexOf(DOMAIN_STATE_SEPARATOR);
@@ -425,7 +508,7 @@
   * Update the changelog state with the state corresponding to the provided
   * domain DN.
   */
  private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState result)
  private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState state)
      throws ChangelogException
  {
    final File domainDirectory = getDomainPath(domainEntry.getValue());
@@ -436,7 +519,7 @@
          replicationRootPath, domainDirectory.getPath()));
    }
    final DN domainDN = domainEntry.getKey();
    result.setDomainGenerationId(domainDN, toGenerationId(generationId));
    state.setDomainGenerationId(domainDN, toGenerationId(generationId));
    final File[] serverIds = domainDirectory.listFiles(SERVER_ID_FILE_FILTER);
    if (serverIds == null)
@@ -446,7 +529,43 @@
    }
    for (final File serverId : serverIds)
    {
      result.addServerIdToDomain(toServerId(serverId.getName()), domainDN);
      readStateForServerId(domainDN, serverId, state);
    }
  }
  private void readStateForServerId(DN domainDN, File serverIdPath, ChangelogState state) throws ChangelogException
  {
    state.addServerIdToDomain(toServerId(serverIdPath.getName()), domainDN);
    final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
    if (offlineFile.exists())
    {
      final CSN offlineCSN = readOfflineStateFile(offlineFile, domainDN);
      state.addOfflineReplica(domainDN, offlineCSN);
    }
  }
  private CSN readOfflineStateFile(final File offlineFile, DN domainDN) throws ChangelogException
  {
    BufferedReader reader = null;
    try
    {
      reader = newFileReader(offlineFile);
      String line = reader.readLine();
      if (line == null || reader.readLine() != null)
      {
        throw new ChangelogException(ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE.get(
            domainDN.toString(), offlineFile.getPath()));
      }
      return new CSN(line);
    }
    catch(IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE.get(
          domainDN.toString(), offlineFile.getPath()), e);
    }
    finally {
      StaticUtils.close(reader);
    }
  }
@@ -458,13 +577,13 @@
    Writer writer = null;
    try
    {
      writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(domainsStateFile), "UTF-8"));
      writer = newFileWriter(domainsStateFile);
      for (final Entry<DN, String> entry : domains.entrySet())
      {
        writer.write(String.format("%s%s%s%n", entry.getValue(), DOMAIN_STATE_SEPARATOR, entry.getKey()));
      }
    }
    catch (Exception e)
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE.get(nextDomainId,
          domainDN.toString(), domainsStateFile.getPath()), e);
@@ -561,7 +680,17 @@
    return new File(replicationRootPath, domainId + DOMAIN_SUFFIX);
  }
  private File getServerIdPath(final String domainId, final int serverId)
  /**
   * Return the path for the provided domain id and server id.
   * Package private to be usable in tests.
   *
   * @param domainId
   *            The id corresponding to a domain DN
   * @param serverId
   *            The server id to retrieve
   * @return the path
   */
  File getServerIdPath(final String domainId, final int serverId)
  {
    return new File(getDomainPath(domainId), String.valueOf(serverId) + SERVER_ID_SUFFIX);
  }
@@ -673,4 +802,16 @@
      throw new ChangelogException(ERR_CHANGELOG_GENERATION_ID_WRONG_FORMAT.get(data), e);
    }
  }
  /** Returns a buffered writer on the provided file. */
  private BufferedWriter newFileWriter(final File file) throws UnsupportedEncodingException, FileNotFoundException
  {
    return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), UTF8_ENCODING));
  }
  /** Returns a buffered reader on the provided file. */
  private BufferedReader newFileReader(final File file) throws UnsupportedEncodingException, FileNotFoundException
  {
    return new BufferedReader(new InputStreamReader(new FileInputStream(file), UTF8_ENCODING));
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -187,6 +187,21 @@
  }
  /**
   * Indicates if the replica corresponding to provided domain DN and server id
   * is offline.
   *
   * @param domainDN
   *          base DN of the replica
   * @param serverId
   *          server id of the replica
   * @return {@code true} if replica is offline, {@code false} otherwise
   */
  public boolean isReplicaOffline(DN domainDN, int serverId)
  {
    return replicasOffline.getCSN(domainDN, serverId) != null;
  }
  /**
   * Ensures the medium consistency point is updated by UpdateMsg.
   *
   * @param baseDN
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -756,6 +756,7 @@
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
      indexer.publishUpdateMsg(baseDN, updateMsg);
    }
    return wasCreated;
@@ -763,21 +764,31 @@
  /** {@inheritDoc} */
  @Override
  public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN)
  public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN) throws ChangelogException
  {
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId());
      indexer.publishHeartbeat(baseDN, heartbeatCSN);
    }
  }
  private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId)
      throws ChangelogException
  {
    if (indexer.isReplicaOffline(baseDN, serverId))
    {
      dbEnv.notifyReplicaOnline(baseDN, serverId);
    }
  }
  /** {@inheritDoc} */
  @Override
  public void replicaOffline(DN baseDN, CSN offlineCSN)
  public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
      throws ChangelogException
  {
    dbEnv.addOfflineReplica(baseDN, offlineCSN);
    dbEnv.notifyReplicaOffline(baseDN, offlineCSN);
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -505,14 +505,32 @@
   */
  static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN)
  {
    final byte[] key =
        toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId()
            + FIELD_SEPARATOR + baseDN.toNormalizedString());
    final byte[] key = toReplicaOfflineKey(baseDN, offlineCSN.getServerId());
    final ByteStringBuilder data = new ByteStringBuilder(8); // store a long
    data.append(offlineCSN.getTime());
    return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray());
  }
  /**
   * Return the key for a replica offline entry in the changelog state database.
   *
   * @param baseDN
   *          the replica's baseDN
   * @param serverId
   *          the replica's serverId
   * @return the key used in the database to store offline time of the replica
   */
  private static byte[] toReplicaOfflineKey(DN baseDN, int serverId)
  {
    return toBytes(OFFLINE_TAG + FIELD_SEPARATOR + serverId + FIELD_SEPARATOR + baseDN.toNormalizedString());
  }
  /** Returns an entry with the provided key and a null value. */
  private SimpleImmutableEntry<byte[], byte[]> toEntryWithNullValue(byte[] key)
  {
    return new SimpleImmutableEntry<byte[], byte[]>(key, null);
  }
  private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry)
      throws ChangelogException, RuntimeException
  {
@@ -689,7 +707,9 @@
  }
  /**
   * Add the information about an offline replica to the changelog state DB.
   * Notify that replica is offline.
   * <p>
   * This information is stored in the changelog state DB.
   *
   * @param baseDN
   *          the domain of the offline replica
@@ -698,7 +718,7 @@
   * @throws ChangelogException
   *           if a database problem occurred
   */
  public void addOfflineReplica(DN baseDN, CSN offlineCSN)
  public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
      throws ChangelogException
  {
    // just overwrite any older entry as it is assumed a newly received offline
@@ -707,6 +727,25 @@
        "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
  }
  /**
   * Notify that replica is online.
   * <p>
   * Update the changelog state DB if necessary (ie, replica was known to be
   * offline).
   *
   * @param baseDN
   *          the domain of replica
   * @param serverId
   *          the serverId of replica
   * @throws ChangelogException
   *           if a database problem occurred
   */
  public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException
  {
    deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
        "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
  }
  private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
      String methodInvocation) throws ChangelogException
  {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
@@ -26,8 +26,8 @@
package org.opends.server.replication.server.changelog.file;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*;
import java.io.Closeable;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
@@ -37,12 +37,14 @@
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -50,6 +52,9 @@
@SuppressWarnings("javadoc")
public class ReplicationEnvironmentTest extends DirectoryServerTestCase
{
  private static final int SERVER_ID_1 = 1;
  private static final int SERVER_ID_2 = 2;
  private static final String DN1_AS_STRING = "cn=test1,dc=company.com";
  private static final String DN2_AS_STRING = "cn=te::st2,dc=company.com";
  private static final String DN3_AS_STRING = "cn=test3,dc=company.com";
@@ -74,42 +79,50 @@
  }
  @Test
  public void testCreateThenReadChangelogStateWithSingleDN() throws Exception
  public void testReadChangelogStateWithSingleDN() throws Exception
  {
    Log<Long,ChangeNumberIndexRecord> cnDB = null;
    Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null;
    try
  {
    final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
    final DN domainDN = DN.decode(DN1_AS_STRING);
    ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
    Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
    Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
    Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
    StaticUtils.close(cnDB, replicaDB, replicaDB2);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
    ChangelogState state = environment.readChangelogState();
    assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
    assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(1, 2);
      assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1, SERVER_ID_2);
    assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
  }
    finally
    {
      StaticUtils.close(cnDB, replicaDB, replicaDB2);
    }
  }
  @Test
  public void testCreateThenReadChangelogStateWithMultipleDN() throws Exception
  public void testReadChangelogStateWithMultipleDN() throws Exception
  {
    Log<Long,ChangeNumberIndexRecord> cnDB = null;
    List<Log<CSN,UpdateMsg>> replicaDBs = new ArrayList<Log<CSN,UpdateMsg>>();
    try
  {
    File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
    List<DN> domainDNs = Arrays.asList(DN.decode(DN1_AS_STRING), DN.decode(DN2_AS_STRING), DN.decode(DN3_AS_STRING));
    ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
    Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
    List<Log<CSN,UpdateMsg>> replicaDBs = new ArrayList<Log<CSN,UpdateMsg>>();
      cnDB = environment.getOrCreateCNIndexDB();
    for (int i = 0; i <= 2 ; i++)
    {
      for (int j = 1; j <= 10; j++)
      {
          // 3 domains, 10 server id each, generation id is different for each domain
        replicaDBs.add(environment.getOrCreateReplicaDB(domainDNs.get(i), j, i+1));
      }
    }
    StaticUtils.close(cnDB);
    StaticUtils.close(replicaDBs.toArray(new Closeable[] {}));
    ChangelogState state = environment.readChangelogState();
@@ -123,16 +136,164 @@
        MapEntry.entry(domainDNs.get(1), 2L),
        MapEntry.entry(domainDNs.get(2), 3L));
  }
    finally
    {
      StaticUtils.close(cnDB);
      StaticUtils.close(replicaDBs);
    }
  }
  @Test
  public void testReadChangelogStateWithReplicaOffline() throws Exception
  {
    Log<Long,ChangeNumberIndexRecord> cnDB = null;
    Log<CSN,UpdateMsg> replicaDB = null;
    try
    {
      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      final DN domainDN = DN.decode(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      // put server id 1 offline
      CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
      environment.notifyReplicaOffline(domainDN, offlineCSN);
      ChangelogState state = environment.readChangelogState();
      assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
    }
    finally
    {
      StaticUtils.close(cnDB, replicaDB);
    }
  }
  @Test(expectedExceptions=ChangelogException.class)
  public void testReadChangelogStateWithReplicaOfflineStateFileCorrupted() throws Exception
  {
    Log<Long,ChangeNumberIndexRecord> cnDB = null;
    Log<CSN,UpdateMsg> replicaDB = null;
    try
    {
      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      final DN domainDN = DN.decode(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      File offlineStateFile = new File(environment.getServerIdPath("1", 1), REPLICA_OFFLINE_STATE_FILENAME);
      offlineStateFile.createNewFile();
      environment.readChangelogState();
    }
    finally
    {
      StaticUtils.close(cnDB, replicaDB);
    }
  }
  @Test
  public void testReadChangelogStateWithReplicaOfflineSentTwice() throws Exception
  {
    Log<Long,ChangeNumberIndexRecord> cnDB = null;
    Log<CSN,UpdateMsg> replicaDB = null;
    try
    {
      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      final DN domainDN = DN.decode(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      // put server id 1 offline twice
      CSNGenerator csnGenerator = new CSNGenerator(SERVER_ID_1, 100);
      environment.notifyReplicaOffline(domainDN, csnGenerator.newCSN());
      CSN lastOfflineCSN = csnGenerator.newCSN();
      environment.notifyReplicaOffline(domainDN, lastOfflineCSN);
      ChangelogState state = environment.readChangelogState();
      assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(lastOfflineCSN)));
    }
    finally
    {
      StaticUtils.close(cnDB, replicaDB);
    }
  }
  @Test
  public void testReadChangelogStateWithReplicaOfflineThenReplicaOnline() throws Exception
  {
    Log<Long,ChangeNumberIndexRecord> cnDB = null;
    Log<CSN,UpdateMsg> replicaDB = null;
    try
    {
      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      final DN domainDN = DN.decode(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      // put server id 1 offline
      environment.notifyReplicaOffline(domainDN, new CSN(TimeThread.getTime(), 0, SERVER_ID_1));
      // put server id 1 online again
      environment.notifyReplicaOnline(domainDN, SERVER_ID_1);
      ChangelogState state = environment.readChangelogState();
      assertThat(state.getOfflineReplicas()).isEmpty();
    }
    finally
    {
      StaticUtils.close(cnDB, replicaDB);
    }
  }
  @Test
  public void testCreateThenReadChangelogStateWithReplicaOffline() throws Exception
  {
    Log<Long,ChangeNumberIndexRecord> cnDB = null;
    Log<CSN,UpdateMsg> replicaDB = null;
    try
    {
      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      final DN domainDN = DN.decode(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
      environment.notifyReplicaOffline(domainDN, offlineCSN);
      ChangelogState state = environment.readChangelogState();
      assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
      assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1);
      assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
      assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
    }
    finally
    {
      StaticUtils.close(cnDB, replicaDB);
    }
  }
  @Test(expectedExceptions=ChangelogException.class)
  public void testMissingDomainDirectory() throws Exception
  {
    Log<Long,ChangeNumberIndexRecord> cnDB = null;
    Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null;
    try
    {
    File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
    DN domainDN = DN.decode(DN1_AS_STRING);
    ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
    Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
    Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
    StaticUtils.close(replicaDB, replicaDB2);
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
    // delete the domain directory created for the 2 replica DBs to break the
    // consistency with domain state file
@@ -140,4 +301,9 @@
    environment.readChangelogState();
  }
    finally
    {
      StaticUtils.close(cnDB, replicaDB, replicaDB2);
    }
  }
}