mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
26.19.2014 6e14a8394d193af0fa32b83d3cc424787d41eadd
opends/src/messages/messages/replication.properties
@@ -533,9 +533,6 @@
 change %s to replicaDB %s %s because flushing thread is shutting down
SEVERE_ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH_241=Error when retrieving changelog \
 state from root path '%s' : directory might not exist
SEVERE_ERR_CHANGELOG_READ_STATE_NO_GENERATION_ID_FOUND_242=Error when retrieving \
 changelog state from root path '%s' : no generation id file found in domain \
 directory '%s'
SEVERE_ERR_CHANGELOG_READ_STATE_CANT_READ_DOMAIN_DIRECTORY_243=Error when retrieving \
 changelog state from root path '%s' : IO error on domain directory '%s' when retrieving \
 list of server ids
@@ -603,8 +600,6 @@
 head log file from '%s' to '%s'
INFO_CHANGELOG_LOG_FILE_ROTATION_276=Rotation needed for log file '%s', \
 size of head log file is %d bytes
SEVERE_ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH_277=Could not add replica \
 offline for domain %s and server id %d because the path '%s' does not exist
SEVERE_ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE_278=Could not write offline \
 replica information for domain %s and server id %d, using path '%s' (offline CSN is %s)
SEVERE_ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE_279=Could not read replica offline \
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -2012,6 +2012,13 @@
    addOperation.setAttachment(SYNCHROCONTEXT, ctx);
  }
  /** {@inheritDoc} */
  @Override
  public void publishReplicaOfflineMsg()
  {
    pendingChanges.putReplicaOfflineMsg();
  }
  /**
   * Check if an operation must be synchronized.
   * Also update the list of pending changes and the server RUV
opends/src/server/org/opends/server/replication/plugin/PendingChange.java
@@ -29,6 +29,7 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.operation.PluginOperation;
/**
@@ -39,7 +40,7 @@
{
  private final CSN csn;
  private boolean committed;
  private LDAPUpdateMsg msg;
  private UpdateMsg msg;
  private final PluginOperation op;
  private ServerState dependencyState;
@@ -49,7 +50,7 @@
   * @param op the operation to use
   * @param msg the message to use (can be null for local operations)
   */
  PendingChange(CSN csn, PluginOperation op, LDAPUpdateMsg msg)
  PendingChange(CSN csn, PluginOperation op, UpdateMsg msg)
  {
    this.csn = csn;
    this.committed = false;
@@ -89,12 +90,27 @@
   * @return the message if operation was a replication operation
   * null if the operation was a local operation
   */
  public LDAPUpdateMsg getMsg()
  public UpdateMsg getMsg()
  {
    return msg;
  }
  /**
   * Get the LDAPUpdateMsg associated to this PendingChange.
   *
   * @return the LDAPUpdateMsg if operation was a replication operation, null
   *         otherwise
   */
  public LDAPUpdateMsg getLDAPUpdateMsg()
  {
    if (msg instanceof LDAPUpdateMsg)
    {
      return (LDAPUpdateMsg) msg;
    }
    return null;
  }
  /**
   * Set the message associated to the PendingChange.
   * @param msg the message
   */
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -33,6 +33,8 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.types.operation.PluginOperation;
@@ -136,6 +138,20 @@
  }
  /**
   * Add a replica offline message to the pending list.
   */
  public synchronized void putReplicaOfflineMsg()
  {
    final CSN offlineCSN = csnGenerator.newCSN();
    final PendingChange pendingChange =
        new PendingChange(offlineCSN, null, new ReplicaOfflineMsg(offlineCSN));
    pendingChange.setCommitted(true);
    pendingChanges.put(offlineCSN, pendingChange);
    pushCommittedChanges();
  }
  /**
   * Push all committed local changes to the replicationServer service.
   */
  synchronized void pushCommittedChanges()
@@ -152,20 +168,26 @@
    while (firstChange != null && firstChange.isCommitted())
    {
      final PluginOperation op = firstChange.getOp();
      if (op != null && !op.isSynchronizationOperation())
      final UpdateMsg msg = firstChange.getMsg();
      if (msg instanceof LDAPUpdateMsg
          && op != null
          && !op.isSynchronizationOperation())
      {
        final LDAPUpdateMsg updateMsg = firstChange.getMsg();
        if (!recoveringOldChanges)
        {
          domain.publish(updateMsg);
          domain.publish(msg);
        }
        else
        {
          // do not push updates until the RS catches up.
          // @see #setRecovering(boolean)
          domain.getServerState().update(updateMsg.getCSN());
          domain.getServerState().update(msg.getCSN());
        }
      }
      else if (msg instanceof ReplicaOfflineMsg)
      {
        domain.publish(msg);
      }
      // false warning: firstEntry will not be null if firstChange is not null
      pendingChanges.remove(firstEntry.getKey());
opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -157,7 +157,7 @@
      if (change.dependenciesIsCovered(state))
      {
        dependentChanges.remove(change);
        return change.getMsg();
        return change.getLDAPUpdateMsg();
      }
    }
    return null;
@@ -208,7 +208,7 @@
    {
      if (pendingChange.getCSN().isOlderThan(csn))
      {
        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
        if (pendingMsg != null)
        {
          if (pendingMsg instanceof DeleteMsg)
@@ -300,7 +300,7 @@
    {
      if (pendingChange.getCSN().isOlderThan(csn))
      {
        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
        if (pendingMsg instanceof AddMsg)
        {
          // Check if the operation to be run is an addOperation on a same DN.
@@ -350,13 +350,13 @@
      return false;
    }
    final DN targetDN = change.getMsg().getDN();
    final DN targetDN = change.getLDAPUpdateMsg().getDN();
    for (PendingChange pendingChange : pendingChanges.values())
    {
      if (pendingChange.getCSN().isOlderThan(csn))
      {
        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
        if (pendingMsg != null)
        {
          if (pendingMsg instanceof DeleteMsg)
@@ -442,7 +442,7 @@
    {
      if (pendingChange.getCSN().isOlderThan(csn))
      {
        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
        if (pendingMsg != null)
        {
          if (pendingMsg instanceof DeleteMsg)
opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -99,7 +99,7 @@
  /**
   * The constant for the 8th version of the replication protocol.
   * <ul>
   * <li>StopMsg now has a timestamp to communicate the replica stop time.</li>
   * <li>New ReplicaOfflineMsg.</li>
   * </ul>
   */
  public static final short REPLICATION_PROTOCOL_V8 = 8;
opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
New file
@@ -0,0 +1,113 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.protocol;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.CSN;
import static org.opends.server.replication.protocol.ByteArrayBuilder.*;
/**
 * Class that define messages sent by a replica (DS) to the replication server
 * (RS) to let the RS know the date at which a replica went offline.
 */
public class ReplicaOfflineMsg extends UpdateMsg
{
  /**
   * Constructor of a replica offline message providing the offline timestamp in
   * a CSN.
   *
   * @param offlineCSN
   *          the provided offline CSN
   */
  public ReplicaOfflineMsg(final CSN offlineCSN)
  {
    super(offlineCSN, new byte[0]);
  }
  /**
   * Creates a message by deserializing it from the provided byte array.
   *
   * @param in
   *          The provided byte array.
   * @throws DataFormatException
   *           When an error occurs during decoding .
   */
  public ReplicaOfflineMsg(byte[] in) throws DataFormatException
  {
    try
    {
      final ByteArrayScanner scanner = new ByteArrayScanner(in);
      final byte msgType = scanner.nextByte();
      if (msgType != MSG_TYPE_REPLICA_OFFLINE)
      {
        throw new DataFormatException("input is not a valid "
            + getClass().getSimpleName() + " message: " + msgType);
      }
      protocolVersion = scanner.nextShort();
      csn = scanner.nextCSN();
      if (!scanner.isEmpty())
      {
        throw new DataFormatException(
            "Did not expect to find more bytes to read for "
                + getClass().getSimpleName());
      }
    }
    catch (RuntimeException e)
    {
      // Index out of bounds, bad format, etc.
      throw new DataFormatException("byte[] is not a valid "
          + getClass().getSimpleName());
    }
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    final ByteArrayBuilder builder = new ByteArrayBuilder(size());
    builder.appendByte(MSG_TYPE_REPLICA_OFFLINE);
    builder.appendShort(protocolVersion);
    builder.appendCSN(csn);
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public int size()
  {
    return bytes(1) + shorts(1) + csns(1);
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " offlineCSN=" + csn;
  }
}
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -85,6 +85,9 @@
  //   EntryMsg, InitializeRequestMsg, InitializeTargetMsg, ErrorMsg
  //   TopologyMsg
  /** @since {@link ProtocolVersion#REPLICATION_PROTOCOL_V8} */
  static final byte MSG_TYPE_REPLICA_OFFLINE = 37;
  // Adding a new type of message here probably requires to
  // change accordingly generateMsg method below
@@ -199,6 +202,8 @@
      return new StopMsg(buffer);
    case MSG_TYPE_INITIALIZE_RCV_ACK:
      return new InitializeRcvAckMsg(buffer);
    case MSG_TYPE_REPLICA_OFFLINE:
      return new ReplicaOfflineMsg(buffer);
    default:
      throw new DataFormatException("received message with unknown type");
    }
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -223,7 +223,13 @@
      {
        final UpdateMsg newMsg = mh.getNextMessage(false /* non blocking */);
        if (newMsg == null)
        if (newMsg instanceof ReplicaOfflineMsg)
        {
          // and ReplicaOfflineMsg cannot be returned to a search on cn=changelog
          // proceed as if it was never returned
          continue;
        }
        else if (newMsg == null)
        { // in non blocking mode, null means no more messages
          return null;
        }
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -496,6 +496,13 @@
  {
    try
    {
      if (updateMsg instanceof ReplicaOfflineMsg)
      {
        final ReplicaOfflineMsg offlineMsg = (ReplicaOfflineMsg) updateMsg;
        this.domainDB.notifyReplicaOffline(baseDN, offlineMsg.getCSN());
        return true;
      }
      if (this.domainDB.publishUpdateMsg(baseDN, updateMsg))
      {
        /*
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -45,6 +45,7 @@
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
import org.opends.server.replication.server.changelog.je.CompositeDBCursor;
import org.opends.server.replication.server.changelog.je.ReplicaOfflineCursor;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
@@ -388,7 +389,7 @@
  {
    if (debugEnabled())
    {
      TRACER.debugError("clear the FileChangelogDB");
      TRACER.debugInfo("clear the FileChangelogDB");
    }
    if (!dbDirectory.exists())
    {
@@ -631,18 +632,39 @@
      throws ChangelogException
  {
    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
    final ChangelogState state = replicationEnv.readChangelogState();
    final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
    for (int serverId : serverIds)
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
      cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
      final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState);
      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
    }
    // recycle exhausted cursors,
    // because client code will not manage the cursors itself
    return new CompositeDBCursor<Void>(cursors, true);
  }
  private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId,
      ServerState startAfterServerState)
  {
    final List<CSN> domain = state.getOfflineReplicas().get(baseDN);
    if (domain != null)
    {
      for (CSN offlineCSN : domain)
      {
        if (serverId == offlineCSN.getServerId()
            && !startAfterServerState.cover(offlineCSN))
        {
          return offlineCSN;
        }
      }
    }
    return null;
  }
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -334,9 +334,13 @@
   */
  void clearGenerationId(final DN domainDN) throws ChangelogException
  {
    synchronized(domainLock)
    synchronized (domainLock)
    {
      final String domainId = domains.get(domainDN);
      if (domainId == null)
      {
        return; // unknow domain => no-op
      }
      final File idFile = retrieveGenerationIdFile(getDomainPath(domainId));
      if (idFile != null)
      {
@@ -365,6 +369,10 @@
    {
      clearGenerationId(baseDN);
      final String domainId = domains.get(baseDN);
      if (domainId == null)
      {
        return; // unknow domain => no-op
      }
      final File generationIdPath = getGenerationIdPath(domainId, NO_GENERATION_ID);
      ensureGenerationIdFileExists(generationIdPath);
    }
@@ -386,11 +394,14 @@
    synchronized (domainLock)
    {
      final String domainId = domains.get(domainDN);
      if (domainId == null)
      {
        return; // unknow domain => no-op
      }
      final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId());
      if (!serverIdPath.exists())
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH.get(
            domainDN.toString(), offlineCSN.getServerId(), serverIdPath.getPath()));
        return; // no serverId anymore => no-op
      }
      final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
      Writer writer = null;
@@ -428,6 +439,10 @@
    synchronized (domainLock)
    {
      final String domainId = domains.get(domainDN);
      if (domainId == null)
      {
        return; // unknow domain => no-op
      }
      final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME);
      if (offlineFile.exists())
      {
@@ -512,14 +527,12 @@
      throws ChangelogException
  {
    final File domainDirectory = getDomainPath(domainEntry.getValue());
    final String generationId = retrieveGenerationId(domainDirectory);
    if (generationId == null)
    {
      throw new ChangelogException(ERR_CHANGELOG_READ_STATE_NO_GENERATION_ID_FOUND.get(
          replicationRootPath, domainDirectory.getPath()));
    }
    final DN domainDN = domainEntry.getKey();
    state.setDomainGenerationId(domainDN, toGenerationId(generationId));
    final String generationId = retrieveGenerationId(domainDirectory);
    if (generationId != null)
    {
      state.setDomainGenerationId(domainDN, toGenerationId(generationId));
    }
    final File[] serverIds = domainDirectory.listFiles(SERVER_ID_FILE_FILTER);
    if (serverIds == null)
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -707,18 +707,39 @@
      throws ChangelogException
  {
    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
    final ChangelogState state = dbEnv.readChangelogState();
    final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
    for (int serverId : serverIds)
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
      cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
      final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState);
      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
    }
    // recycle exhausted cursors,
    // because client code will not manage the cursors itself
    return new CompositeDBCursor<Void>(cursors, true);
  }
  private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId,
      ServerState startAfterServerState)
  {
    final List<CSN> domain = state.getOfflineReplicas().get(baseDN);
    if (domain != null)
    {
      for (CSN offlineCSN : domain)
      {
        if (serverId == offlineCSN.getServerId()
            && !startAfterServerState.cover(offlineCSN))
        {
          return offlineCSN;
        }
      }
    }
    return null;
  }
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java
New file
@@ -0,0 +1,126 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
/**
 * Implementation of a DBCursor that decorates an existing DBCursor
 * and returns a ReplicaOfflineMsg when the decorated DBCursor is exhausted
 * and the offline CSN is newer than the last returned update CSN.
 */
public class ReplicaOfflineCursor implements DBCursor<UpdateMsg>
{
  /** @NonNull */
  private final DBCursor<UpdateMsg> cursor;
  private ReplicaOfflineMsg replicaOfflineMsg;
  /**
   * Whether calls to {@link #getRecord()} must return the {@link ReplicaOfflineMsg}
   */
  private boolean returnReplicaOfflineMsg;
  /**
   * Creates a ReplicaOfflineCursor object with a cursor to decorate
   * and an offlineCSN to return as part of a ReplicaOfflineMsg.
   *
   * @param cursor
   *          the non-null underlying cursor that needs to be exhausted before
   *          we return a ReplicaOfflineMsg
   * @param offlineCSN
   *          The offline CSN from which to builder the
   *          {@link ReplicaOfflineMsg} to return
   */
  public ReplicaOfflineCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN)
  {
    this.replicaOfflineMsg =
        offlineCSN != null ? new ReplicaOfflineMsg(offlineCSN) : null;
    this.cursor = cursor;
  }
  /** {@inheritDoc} */
  @Override
  public UpdateMsg getRecord()
  {
    return returnReplicaOfflineMsg ? replicaOfflineMsg : cursor.getRecord();
  }
  /** {@inheritDoc} */
  @Override
  public boolean next() throws ChangelogException
  {
    if (returnReplicaOfflineMsg)
    {
      // already consumed, never return it again...
      replicaOfflineMsg = null;
      returnReplicaOfflineMsg = false;
      // ...and verify if new changes have been added to the DB
      // (cursors are automatically restarted)
    }
    final UpdateMsg lastUpdate = cursor.getRecord();
    final boolean hasNext = cursor.next();
    if (hasNext)
    {
      return true;
    }
    if (replicaOfflineMsg == null)
    { // no ReplicaOfflineMsg to return
      return false;
    }
    // replicaDB just happened to be exhausted now
    if (lastUpdate != null
        && replicaOfflineMsg.getCSN().isOlderThanOrEqualTo(lastUpdate.getCSN()))
    {
      // offlineCSN is outdated, never return it
      replicaOfflineMsg = null;
      return false;
    }
    returnReplicaOfflineMsg = true;
    return true;
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    cursor.close();
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName()
        + " returnReplicaOfflineMsg=" + returnReplicaOfflineMsg
        + " offlineCSN="
        + (replicaOfflineMsg != null ? replicaOfflineMsg.getCSN().toStringUI() : null)
        + " cursor=" + cursor.toString().split("", 2)[1];
  }
}
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -2802,6 +2802,7 @@
    synchronized (startStopLock)
    {
      domain.publishReplicaOfflineMsg();
      shutdown = true;
      setConnectedRS(ConnectedRS.stopped());
      stopRSHeartBeatMonitoring();
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -3450,6 +3450,15 @@
  }
  /**
   * Publishes a replica offline message if all pending changes for current
   * replica have been sent out.
   */
  public void publishReplicaOfflineMsg()
  {
    // Here to be overridden
  }
  /**
   * Publish information to the Replication Service (not assured mode).
   *
   * @param msg  The byte array containing the information that should
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -843,6 +843,17 @@
    assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN());
  }
  @Test
  public void replicaOfflineMsgTest() throws Exception
  {
    final CSN csn = new CSN(System.currentTimeMillis(), 0, 42);
    final ReplicaOfflineMsg expectedMsg = new ReplicaOfflineMsg(csn);
    final byte[] bytes = expectedMsg.getBytes(REPLICATION_PROTOCOL_V8);
    ReplicaOfflineMsg decodedMsg = new ReplicaOfflineMsg(bytes);
    assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN());
  }
  /**
   * Test that WindowMsg encoding and decoding works
   * by checking that : msg == new WindowMsg(msg.getBytes()).
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -29,7 +29,6 @@
import java.util.Map;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
@@ -56,10 +55,10 @@
  @BeforeClass
  public void setupMsgs()
  {
    msg1 = newUpdateMsg(1);
    msg2 = newUpdateMsg(2);
    msg3 = newUpdateMsg(3);
    msg4 = newUpdateMsg(4);
    msg1 = new FakeUpdateMsg(1);
    msg2 = new FakeUpdateMsg(2);
    msg3 = new FakeUpdateMsg(3);
    msg4 = new FakeUpdateMsg(4);
  }
  @Test
@@ -134,19 +133,6 @@
        of(msg4, baseDN1));
  }
  private UpdateMsg newUpdateMsg(final int t)
  {
    return new UpdateMsg(new CSN(t, t, t), new byte[t])
    {
      /** {@inheritDoc} */
      @Override
      public String toString()
      {
        return "UpdateMsg(" + t + ")";
      }
    };
  }
  private CompositeDBCursor<String> newCompositeDBCursor(
      Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception
  {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java
New file
@@ -0,0 +1,47 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
@SuppressWarnings("javadoc")
class FakeUpdateMsg extends UpdateMsg
{
  private final int t;
  FakeUpdateMsg(int t)
  {
    super(new CSN(t, t, t), new byte[1]);
    this.t = t;
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return "UpdateMsg(" + t + ")";
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
New file
@@ -0,0 +1,129 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.*;
/**
 * Test the ReplicaOfflineCursor class.
 */
@SuppressWarnings("javadoc")
public class ReplicaOfflineCursorTest extends ReplicationTestCase
{
  private int timestamp;
  private DBCursor<UpdateMsg> delegateCursor;
  @BeforeTest
  public void init()
  {
    timestamp = 1;
  }
  @Test
  public void cursorReturnsFalse() throws Exception
  {
    delegateCursor = new SequentialDBCursor();
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
    assertThat(cursor.getRecord()).isNull();
    assertThat(cursor.next()).isFalse();
    assertThat(cursor.getRecord()).isNull();
  }
  @Test
  public void cursorReturnsTrue() throws Exception
  {
    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
    delegateCursor = new SequentialDBCursor(null, updateMsg);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
    assertThat(cursor.getRecord()).isNull();
    assertThat(cursor.next()).isTrue();
    assertThat(cursor.getRecord()).isSameAs(updateMsg);
    assertThat(cursor.next()).isFalse();
    assertThat(cursor.getRecord()).isNull();
  }
  @Test
  public void cursorReturnsReplicaOfflineMsg() throws Exception
  {
    delegateCursor = new SequentialDBCursor();
    final CSN offlineCSN = new CSN(timestamp++, 1, 1);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
    assertThat(cursor.getRecord()).isNull();
    assertThat(cursor.next()).isTrue();
    final UpdateMsg record = cursor.getRecord();
    assertThat(record).isInstanceOf(ReplicaOfflineMsg.class);
    assertThat(record.getCSN()).isEqualTo(offlineCSN);
    assertThat(cursor.next()).isFalse();
    assertThat(cursor.getRecord()).isNull();
  }
  @Test
  public void cursorReturnsUpdateMsgThenReplicaOfflineMsg() throws Exception
  {
    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
    delegateCursor = new SequentialDBCursor(null, updateMsg);
    final CSN offlineCSN = new CSN(timestamp++, 1, 1);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
    assertThat(cursor.getRecord()).isNull();
    assertThat(cursor.next()).isTrue();
    assertThat(cursor.getRecord()).isSameAs(updateMsg);
    assertThat(cursor.next()).isTrue();
    final UpdateMsg record = cursor.getRecord();
    assertThat(record).isInstanceOf(ReplicaOfflineMsg.class);
    assertThat(record.getCSN()).isEqualTo(offlineCSN);
    assertThat(cursor.next()).isFalse();
    assertThat(cursor.getRecord()).isNull();
  }
  @Test
  public void cursorReturnsUpdateMsgThenNeverReturnsOutdatedReplicaOfflineMsg() throws Exception
  {
    final CSN outdatedOfflineCSN = new CSN(timestamp++, 1, 1);
    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
    delegateCursor = new SequentialDBCursor(null, updateMsg);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, outdatedOfflineCSN);
    assertThat(cursor.getRecord()).isNull();
    assertThat(cursor.next()).isTrue();
    assertThat(cursor.getRecord()).isSameAs(updateMsg);
    assertThat(cursor.next()).isFalse();
    assertThat(cursor.getRecord()).isNull();
  }
}