mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

ludovicp
27.28.2010 a5c5efbf8ca56c059709953f7fedb647dadaed06
opends/resource/schema/02-config.ldif
@@ -2463,6 +2463,11 @@
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.607
  NAME 'ds-cfg-initialization-window-size'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
  NAME 'ds-cfg-access-control-handler'
  SUP top
@@ -3026,7 +3031,8 @@
        ds-cfg-fractional-exclude $
        ds-cfg-fractional-include $
        ds-cfg-solve-conflicts $
        ds-cfg-changetime-heartbeat-interval )
        ds-cfg-changetime-heartbeat-interval $
        ds-cfg-initialization-window-size )
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.58
  NAME 'ds-cfg-length-based-password-validator'
opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml
@@ -23,7 +23,7 @@
  ! CDDL HEADER END
  !
  !
  !      Copyright 2007-2009 Sun Microsystems, Inc.
  !      Copyright 2007-2010 Sun Microsystems, Inc.
  ! -->
<adm:managed-object name="replication-domain"
  plural-name="replication-domains"
@@ -495,4 +495,23 @@
      </ldap:attribute>
    </adm:profile>
  </adm:property>
  <adm:property name="initialization-window-size">
    <adm:synopsis>
      Specifies the window size that this Directory Server may use when
      communicating with remote Directory Servers for initialization.
    </adm:synopsis>
    <adm:default-behavior>
      <adm:defined>
        <adm:value>100</adm:value>
      </adm:defined>
    </adm:default-behavior>
    <adm:syntax>
      <adm:integer />
    </adm:syntax>
    <adm:profile name="ldap">
      <ldap:attribute>
        <ldap:name>ds-cfg-initialization-window-size</ldap:name>
      </ldap:attribute>
    </adm:profile>
  </adm:property>
</adm:managed-object>
opends/src/admin/messages/ReplicationDomainCfgDefn.properties
@@ -23,6 +23,7 @@
property.group-id.description=This value defines the group ID of the replicated domain. The replication system will preferably connect and send updates to replicate to a replication server with the same group ID as its own one (the local server group ID).
property.heartbeat-interval.synopsis=Specifies the heart-beat interval that the Directory Server will use when communicating with Replication Servers.
property.heartbeat-interval.description=The Directory Server expects a regular heart-beat coming from the Replication Server within the specified interval. If a heartbeat is not received within the interval, the Directory Server closes its connection and connects to another Replication Server.
property.initialization-window-size.synopsis=Specifies the window size that this Directory Server may use when communicating with remote Directory Servers for initialization.
property.isolation-policy.synopsis=Specifies the behavior of the Directory Server if a write operation is attempted on the data within the Replication Domain when none of the configured Replication Servers are available.
property.isolation-policy.syntax.enumeration.value.accept-all-updates.synopsis=Indicates that updates should be accepted even though it is not possible to send them to any Replication Server. Best effort is made to re-send those updates to a Replication Servers when one of them is available, however those changes are at risk because they are only available from the historical information. This mode can also introduce high replication latency.
property.isolation-policy.syntax.enumeration.value.reject-all-updates.synopsis=Indicates that all updates attempted on this Replication Domain are rejected when no Replication Server is available.
opends/src/messages/messages/replication.properties
@@ -142,9 +142,12 @@
 (or never purge)
SEVERE_ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED_44=The current request is \
 rejected due to an import or an export already in progress for the same data
SEVERE_ERR_INVALID_IMPORT_SOURCE_45=Invalid source for the import
SEVERE_ERR_INVALID_IMPORT_SOURCE_45=On domain %s, initialization of server \
 with serverId:%s has been requested from a server with an invalid \
 serverId:%s. %s
SEVERE_ERR_INVALID_EXPORT_TARGET_46=Invalid target for the export
SEVERE_ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN_47=No reachable peer in the domain
SEVERE_ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN_47=Domain %s: the server with \
 serverId=%s is unreachable
SEVERE_ERR_NO_MATCHING_DOMAIN_48=No domain matches the provided base DN '%s'
SEVERE_ERR_MULTIPLE_MATCHING_DOMAIN_49=Multiple domains match the base DN \
 provided
@@ -277,9 +280,12 @@
 from the server with server ID %s too late and are ignored
NOTICE_SERVER_STATE_RECOVERY_117=ServerState recovery for domain %s, \
updated with changeNumber %s
SEVERE_ERR_RESET_GENERATION_CONN_ERR_ID_118=The generation ID could not be \
reset for domain %s because it is NOT connected to the replication. You should \
check in the configuration that the domain is enabled
SEVERE_ERR_RESET_GENERATION_CONN_ERR_ID_118=For replicated domain %s, in \
server with serverId=%s, the generation ID could not be set to value %s \
in the rest of the topology because this server is NOT connected to \
any replication server. You should \
check in the configuration that the domain is enabled and that there is one \
replication server up and running
SEVERE_ERR_EXCEPTION_STARTING_SESSION_PHASE_119=Caught Exception during initial \
 communication (phase %s) on domain %s with replication server %s : %s
NOTICE_NEW_SERVER_WITH_SAME_GROUP_ID_120=Disconnecting from replication server \
@@ -335,13 +341,13 @@
 directory server %s
NOTICE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END_142=Local directory server %s has \
 finished online full update for importing suffix %s data from remote \
 directory server %s
 directory server %s. %s
NOTICE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_143=Local directory server %s is \
 starting online full update for exporting suffix %s data to remote directory \
 server %s
 starting online full update for exporting %s entries from suffix %s \
 to remote directory server %s
NOTICE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_144=Local directory server %s has \
 finished online full update for exporting suffix data %s to remote directory \
 server %s
 server %s. %s
NOTICE_TIMEOUT_WHEN_CROSS_CONNECTION_145=Timed out trying to acquire the domain \
lock for %s. Connection attempt from replication server %s to local replication \
server %s will be aborted. Simultanate cross connection attempt ?
@@ -450,4 +456,36 @@
SEVERE_ERR_INVALID_COOKIE_SYNTAX_187=Invalid syntax of the provided cookie
NOTICE_NEW_BEST_REPLICATION_SERVER_188=Domain %s (server id: %s) : \
 disconnecting from this replication server (server id: %s, url: %s) : as a \
 new one is more suitable
 new one %s is more suitable
MILD_ERR_INIT_EXPORTER_DISCONNECTION_189=Domain %s (server id: %s) : \
 remote exporter server disconnection (server id: %s ) detected during \
 initialization
SEVERE_ERR_INIT_IMPORT_FAILURE_190=\
 During initialization from a remote server, the following error occurred : %s
SEVERE_ERR_INIT_RS_DISCONNECTION_DURING_IMPORT_191=\
 Connection failure with Replication Server %s during import
SEVERE_ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT_192=\
 Bad msg id sequence during import. Expected:%s Actual:%s
SEVERE_ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=\
 The following servers did not acknowledged initialization in the expected \
 time. They are potentially down or too slow. Servers list: %s
SEVERE_ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=\
 The following servers did not end initialization being connected with the \
 right generation (%s). They are potentially stopped or too slow. \
 Servers list: %s
SEVERE_ERR_INIT_RS_DISCONNECTION_DURING_EXPORT_195=\
 When initializing remote server(s), connection to Replication Server with \
 serverId=%s is lost
SEVERE_ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT_196=\
 When initializing remote server(s), the initialized server with serverId=%s \
 is potentially stopped or too slow
SEVERE_ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST_197=\
 When sending a new initialization request for an initialization from a remote \
 server, the following error occured %s. The initial error was : %s
NOTICE_RESENDING_INIT_FROM_REMOTE_REQUEST_198=\
 Resending a new initialization request for an initialization from a remote \
 server due to the root error : %s
NOTICE_RESENDING_INIT_TARGET_199=\
 Resending a new initialization start for an initialization of a remote server \
 due to the root error : %s
opends/src/server/org/opends/server/replication/common/DSInfo.java
@@ -58,6 +58,8 @@
  private List<String> refUrls = new ArrayList<String>(0);
  // Group id
  private byte groupId = (byte) -1;
  // Protocol version
  private short protocolVersion = -1;
  private Set<String> eclIncludes = new HashSet<String>();
@@ -74,10 +76,12 @@
   * @param groupId DS group id
   * @param refUrls DS exported referrals URLs
   * @param eclIncludes The list of entry attributes to include in the ECL.
   * @param protocolVersion Protocol version supported by this server.
   */
  public DSInfo(int dsId, int rsId, long generationId, ServerStatus status,
    boolean assuredFlag, AssuredMode assuredMode, byte safeDataLevel,
    byte groupId, List<String> refUrls, Set<String> eclIncludes)
    byte groupId, List<String> refUrls, Set<String> eclIncludes,
    short protocolVersion)
  {
    this.dsId = dsId;
@@ -90,6 +94,7 @@
    this.groupId = groupId;
    this.refUrls = refUrls;
    this.eclIncludes = eclIncludes;
    this.protocolVersion = protocolVersion;
  }
  /**
@@ -175,7 +180,7 @@
  /**
   * Get the entry attributes to be included in the ECL.
   * @return a.
   * @return The entry attributes to be included in the ECL.
   */
  public Set<String> getEclIncludes()
  {
@@ -183,6 +188,16 @@
  }
  /**
   * Get the protocol version supported by this server.
   * Returns -1 when the protocol version is not known (too old version).
   * @return The protocol version.
   */
  public short getProtocolVersion()
  {
    return protocolVersion;
  }
  /**
   * Test if the passed object is equal to this one.
   * @param obj The object to test
   * @return True if both objects are equal
@@ -205,6 +220,7 @@
        (assuredMode == dsInfo.getAssuredMode()) &&
        (safeDataLevel == dsInfo.getSafeDataLevel()) &&
        (groupId == dsInfo.getGroupId()) &&
        (protocolVersion == dsInfo.getProtocolVersion()) &&
        (refUrls.equals(dsInfo.getRefUrls())) &&
         (((eclIncludes == null) && (dsInfo.getEclIncludes() == null)) ||
           ((eclIncludes != null) &&
@@ -234,6 +250,7 @@
    hash = 73 * hash + (this.refUrls != null ? this.refUrls.hashCode() : 0);
    hash = 73 * hash + (this.eclIncludes != null ? eclIncludes.hashCode() : 0);
    hash = 73 * hash + this.groupId;
    hash = 73 * hash + this.protocolVersion;
    return hash;
  }
@@ -261,6 +278,8 @@
    sb.append(safeDataLevel);
    sb.append(" ; Group id: ");
    sb.append(groupId);
    sb.append(" ; Protocol version: ");
    sb.append(protocolVersion);
    sb.append(" ; Referral URLs: ");
    sb.append(refUrls);
    sb.append(" ; ECL Include: ");
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -67,7 +67,6 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
import org.opends.server.admin.std.server.ExternalChangelogDomainCfg;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
@@ -535,7 +534,8 @@
    throws ConfigException
  {
    super(configuration.getBaseDN().toNormalizedString(),
          configuration.getServerId());
          configuration.getServerId(),
          configuration.getInitializationWindowSize());
    /**
     * The time in milliseconds between heartbeats from the replication
@@ -1786,7 +1786,7 @@
   */
  @Override
  protected void initializeRemote(int target, int requestorID,
    Task initTask) throws DirectoryException
    Task initTask, int initWindow) throws DirectoryException
  {
    if ((target == RoutableMsg.ALL_SERVERS) && fractionalConfig.isFractional())
    {
@@ -1795,7 +1795,7 @@
      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, msg);
    } else
    {
      super.initializeRemote(target, requestorID, initTask);
      super.initializeRemote(target, requestorID, initTask, this.initWindow);
    }
  }
@@ -2449,23 +2449,25 @@
   */
  public void shutdown()
  {
    // stop the flush thread
    shutdown = true;
    // stop the thread in charge of flushing the ServerState.
    if (flushThread != null)
    if (!shutdown)
    {
      synchronized (flushThread)
      shutdown = true;
      // stop the thread in charge of flushing the ServerState.
      if (flushThread != null)
      {
        flushThread.notify();
        synchronized (flushThread)
        {
          flushThread.notify();
        }
      }
      DirectoryServer.deregisterAlertGenerator(this);
      // stop the ReplicationDomain
      stopDomain();
    }
    DirectoryServer.deregisterAlertGenerator(this);
    // stop the ReplicationDomain
    stopDomain();
    // wait for completion of the persistentServerState thread.
    try
    {
@@ -3945,16 +3947,15 @@
  }
  /**
   * This method should trigger an import of the replicated data.
   * This method triggers an import of the replicated data.
   *
   * @param input                The InputStream from which
   * @param input                The InputStream from which the data are read.
   * @throws DirectoryException  When needed.
   */
  @Override
  public void importBackend(InputStream input) throws DirectoryException
  protected void importBackend(InputStream input) throws DirectoryException
  {
    LDIFImportConfig importConfig = null;
    DirectoryException de = null;
    Backend backend = retrievesBackend(baseDn);
@@ -3964,8 +3965,9 @@
      {
        Message message = ERR_INIT_IMPORT_NOT_SUPPORTED.get(
            backend.getBackendID().toString());
        logError(message);
        de = new DirectoryException(ResultCode.OTHER, message);
        if (ieContext.getException() == null)
          ieContext.setException(new DirectoryException(ResultCode.OTHER,
              message));
      }
      else
      {
@@ -3997,30 +3999,33 @@
    }
    catch(Exception e)
    {
      de = new DirectoryException(ResultCode.OTHER,
          Message.raw(e.getLocalizedMessage()));
      if (ieContext.getException() == null)
        ieContext.setException(new DirectoryException(
            ResultCode.OTHER,
            ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
    }
    finally
    {
      // Cleanup
      if (importConfig != null)
      {
        importConfig.close();
        // Re-enable backend
        closeBackendImport(backend);
        backend = retrievesBackend(baseDn);
      }
      try
      {
        // Cleanup
        if (importConfig != null)
        {
          importConfig.close();
          closeBackendImport(backend); // Re-enable backend
          backend = retrievesBackend(baseDn);
        }
        loadDataState();
        if (debugEnabled())
          TRACER.debugInfo(
              "After import, the replication plugin restarts connections" +
              " to all RSs to provide new generation ID=" + generationId);
        if (ieContext.getException() != null)
        {
          // When an error occurred during an import, most of times
          // the generationId coming in the root entry of the imported data,
          // is not valid anymore (partial data in the backend).
          generationId = computeGenerationId();
          saveGenerationId(generationId);
        }
      }
      catch (DirectoryException fe)
      {
@@ -4029,15 +4034,15 @@
        // so we don't bother about the new Exception.
        // However if there was no Exception before we want
        // to return this Exception to the task creator.
        if (de == null)
          de = fe;
        if (ieContext.getException() == null)
          ieContext.setException(new DirectoryException(
              ResultCode.OTHER,
              ERR_INIT_IMPORT_FAILURE.get(fe.getLocalizedMessage())));
      }
    }
    // Sends up the root error.
    if (de != null)
    {
      throw de;
    }
    if (ieContext.getException() != null)
      throw ieContext.getException();
  }
  /**
@@ -4810,14 +4815,19 @@
    }
    ResultCode resultCode = ResultCode.OTHER;
    Message message = ERR_INVALID_IMPORT_SOURCE.get();
    if (cause != null)
    {
      Message message = ERR_INVALID_IMPORT_SOURCE.get(
          baseDn.toNormalizedString(), Integer.toString(serverId),
          Integer.toString(source),"Details:" + cause.getLocalizedMessage());
      throw new DirectoryException(
          resultCode, message, cause);
    }
    else
    {
      Message message = ERR_INVALID_IMPORT_SOURCE.get(
          baseDn.toNormalizedString(), Integer.toString(serverId),
          Integer.toString(source),"");
      throw new DirectoryException(
          resultCode, message);
    }
opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -39,6 +39,7 @@
{
  // The byte array containing the bytes of the entry transported
  private byte[] entryByteArray;
  private int msgId = -1; // from V4
  /**
   * Creates a new EntryMsg.
@@ -46,52 +47,60 @@
   * @param sender      The sender of this message.
   * @param destination The destination of this message.
   * @param entryBytes  The bytes of the entry.
   * @param msgId       Message counter.
   */
  public EntryMsg(
      int sender,
      int destination,
      byte[] entryBytes)
      byte[] entryBytes,
      int msgId)
  {
    super(sender, destination);
    this.entryByteArray = new byte[entryBytes.length];
    System.arraycopy(entryBytes, 0, this.entryByteArray, 0, entryBytes.length);
    this.msgId = msgId;
  }
  /**
   * Creates a new EntryMsg.
   *
   * @param serverID The sender of this message.
   * @param i The destination of this message.
   * @param entryBytes The bytes of the entry.
   * @param serverID    The sender of this message.
   * @param i           The destination of this message.
   * @param entryBytes  The bytes of the entry.
   * @param pos         The starting Position in the array.
   * @param length      Number of array elements to be copied.
   * @param msgId       Message counter.
   */
  public EntryMsg(
      int serverID,
      int i,
      byte[] entryBytes,
      int pos,
      int length)
      int length,
      int msgId)
  {
    super(serverID, i);
    this.entryByteArray = new byte[length];
    System.arraycopy(entryBytes, pos, this.entryByteArray, 0, length);
    this.msgId = msgId;
  }
  /**
   * Creates a new EntryMsg from its encoded form.
   *
   * @param in The byte array containing the encoded form of the message.
   * @param version The protocol version to use to decode the msg
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ServerStartMessage.
   */
  public EntryMsg(byte[] in) throws DataFormatException
  public EntryMsg(byte[] in, short version) throws DataFormatException
  {
    try
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_ENTRY)
        throw new DataFormatException("input is not a valid ServerStart msg");
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
      int pos = 1;
      // sender
@@ -107,12 +116,22 @@
      pos += length +1;
      // entry
      length = in.length - (pos + 1);
      length = getNextLength(in, pos);
      this.entryByteArray = new byte[length];
      for (int i=0; i<length; i++)
      {
        entryByteArray[i] = in[pos+i];
      }
      pos += length +1;
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // msgCnt
        length = getNextLength(in, pos);
        String msgcntString = new String(in, pos, length, "UTF-8");
        this.msgId = Integer.valueOf(msgcntString);
        pos += length +1;
      }
    }
    catch (UnsupportedEncodingException e)
    {
@@ -134,16 +153,33 @@
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short version)
  {
    try {
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      byte[] msgCntBytes = null;
      byte[] entryBytes = entryByteArray;
      int length = 1 + senderBytes.length +
                   1 + destinationBytes.length +
                   1 + entryBytes.length + 1;
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        msgCntBytes = String.valueOf(msgId).getBytes("UTF-8");
        length += (1 + msgCntBytes.length);
      }
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
@@ -153,6 +189,9 @@
      pos = addByteArray(senderBytes, resultByteArray, pos);
      pos = addByteArray(destinationBytes, resultByteArray, pos);
      pos = addByteArray(entryBytes, resultByteArray, pos);
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        pos = addByteArray(msgCntBytes, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
@@ -160,4 +199,22 @@
      return null;
    }
  }
  /**
   * Return the msg id.
   * @return The msg id.
   */
  public int getMsgId()
  {
    return this.msgId;
  }
  /**
   * Set the msg id.
   * @param msgId The msg id.
   */
  public void setMsgId(int msgId)
  {
    this.msgId = msgId;
  }
}
opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java
@@ -22,13 +22,14 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
import org.opends.messages.Message;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
@@ -51,6 +52,10 @@
  // Specifies the complementary details about the error that was detected
  private Message details = null;
  // The time of creation of this message.
  //                                        protocol version previous to V4
  private Long creationTime = System.currentTimeMillis();
  /**
   * Creates an ErrorMsg providing the destination server.
   *
@@ -64,9 +69,11 @@
    super(sender, destination);
    this.msgID  = details.getDescriptor().getId();
    this.details = details;
    this.creationTime = System.currentTimeMillis();
    if (debugEnabled())
      TRACER.debugInfo(" Creating error message" + this.toString());
      TRACER.debugInfo(" Creating error message" + this.toString()
          + " " + stackTraceToSingleLineString(new Exception("trace")));
  }
  /**
@@ -80,6 +87,7 @@
    super(-2, i);
    this.msgID  = details.getDescriptor().getId();
    this.details = details;
    this.creationTime = System.currentTimeMillis();
    if (debugEnabled())
      TRACER.debugInfo(this.toString());
@@ -89,17 +97,20 @@
   * Creates a new ErrorMsg by decoding the provided byte array.
   *
   * @param  in A byte array containing the encoded information for the Message
   * @param version The protocol version to use to decode the msg.
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded message.
   */
  public ErrorMsg(byte[] in) throws DataFormatException
  public ErrorMsg(byte[] in, short version)
  throws DataFormatException
  {
    super();
    try
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_ERROR)
        throw new DataFormatException("input is not a valid InitializeMessage");
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
      int pos = 1;
      // sender
@@ -125,6 +136,14 @@
      details = Message.raw(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // Creation Time
        length = getNextLength(in, pos);
        String creationTimeString = new String(in, pos, length, "UTF-8");
        creationTime = Long.valueOf(creationTimeString);
        pos += length +1;
      }
    }
    catch (UnsupportedEncodingException e)
    {
@@ -133,9 +152,9 @@
  }
  /**
   * Get the base DN from this InitializeMessage.
   * Get the details from this message.
   *
   * @return the base DN from this InitializeMessage.
   * @return the details from this message.
   */
  public Message getDetails()
  {
@@ -143,35 +162,52 @@
  }
  /**
   * Get the base DN from this InitializeMessage.
   * Get the msgID from this message.
   *
   * @return the base DN from this InitializeMessage.
   * @return the msgID from this message.
   */
  public int getMsgID()
  {
    return msgID;
  }
  // ============
  // Msg encoding
  // ============
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    /* The InitializeMessage is stored in the form :
     * <operation type><basedn><serverid>
     */
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short version)
  {
    try {
      byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
      byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8");
      byte[] byteErrMsgId = String.valueOf(msgID).getBytes("UTF-8");
      byte[] byteDetails = details.toString().getBytes("UTF-8");
      byte[] byteCreationTime = null;
      int length = 1 + byteSender.length + 1
                     + byteDestination.length + 1
                     + byteErrMsgId.length + 1
                     + byteDetails.length + 1;
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        byteCreationTime = creationTime.toString().getBytes("UTF-8");
        length += byteCreationTime.length + 1;
      }
      byte[] resultByteArray = new byte[length];
      // put the type of the operation
@@ -190,6 +226,12 @@
      // details
      pos = addByteArray(byteDetails, resultByteArray, pos);
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // creation time
        pos = addByteArray(byteCreationTime, resultByteArray, pos);
      }
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
@@ -209,6 +251,30 @@
      " sender=" + this.senderID +
      " destination=" + this.destination +
      " msgID=" + this.msgID +
      " details=" + this.details + "]";
      " details=" + this.details +
      " creationTime=" + this.creationTime + "]";
  }
  /**
   * Get the creation time of this message.
   * When several attempts of initialization are done sequentially, it helps
   * sorting the good ones, from the ones that relate to ended initialization
   * when they are received.
   *
   * @return the creation time of this message.
   */
  public Long getCreationTime()
  {
    return creationTime;
  }
  /**
   * Get the creation time of this message.
   * @param creationTime the creation time of this message.
   */
  public void setCreationTime(long creationTime)
  {
    this.creationTime = creationTime;
  }
}
opends/src/server/org/opends/server/replication/protocol/InitializeRcvAckMsg.java
New file
@@ -0,0 +1,160 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
 * This message is used by LDAP server or by Replication Servers to
 * update the send window of the remote entities.
 *
 * A receiving entity should create such a message with a given credit
 * when it wants to open the send window of the remote entity.
 * A LDAP or Replication Server should increase its send window when receiving
 * such a message.
 */
public class InitializeRcvAckMsg extends RoutableMsg
{
  private final int numAck;
  /**
   * Create a new message..
   *
   * @param sender The server ID of the server that send this message.
   * @param destination The destination server or servers of this message.
   * @param numAck The number of acknowledged messages.
   *               The window will be increase by this credit number.
   */
  public InitializeRcvAckMsg(int sender, int destination, int numAck)
  {
    super(sender, destination);
    this.numAck = numAck;
  }
  /**
   * Creates a new message from its encoded form.
   *
   * @param in The byte array containing the encoded form of the message.
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the message.
   */
  public InitializeRcvAckMsg(byte[] in) throws DataFormatException
  {
    super();
    try
    {
      // msg type
      if (in[0] != MSG_TYPE_INITIALIZE_RCV_ACK)
        throw new DataFormatException("input is not a valid "
            + this.getClass().getCanonicalName());
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      senderID = Integer.valueOf(senderString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String serverIdString = new String(in, pos, length, "UTF-8");
      destination = Integer.valueOf(serverIdString);
      pos += length +1;
      // value fo the ack
      length = getNextLength(in, pos);
      String numAckStr = new String(in, pos, length, "UTF-8");
      pos += length +1;
      numAck = Integer.parseInt(numAckStr);
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    try {
      byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
      byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8");
      byte[] byteNumAck = String.valueOf(numAck).getBytes("UTF-8");
      int length = 1 + byteSender.length + 1
                     + byteDestination.length + 1
                     + byteNumAck.length + 1;
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_INITIALIZE_RCV_ACK;
      int pos = 1;
      // sender
      pos = addByteArray(byteSender, resultByteArray, pos);
      // destination
      pos = addByteArray(byteDestination, resultByteArray, pos);
      // ack value
      pos = addByteArray(byteNumAck, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
  }
  /**
   * {@inheritDoc}
   */
  public String toString()
  {
    return this.getClass().getSimpleName()  + "=["+
      " sender=" + this.senderID +
      " destination=" + this.destination +
      " msgID=" + this.numAck + "]";
  }
  /**
   * Get the number of message acknowledged by this message.
   *
   * @return the number of message acknowledged by this message.
   */
  public int getNumAck()
  {
    return numAck;
  }
}
opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -41,27 +41,33 @@
public class InitializeRequestMsg extends RoutableMsg
{
  private String baseDn = null;
  private int initWindow = 0;
  /**
   * Creates a InitializeRequestMsg message.
   *
   * @param baseDn The base DN of the replication domain.
   * @param baseDn      the base DN of the replication domain.
   * @param destination destination of this message
   * @param serverID serverID of the server that will send this message
   * @param serverID    serverID of the server that will send this message
   * @param initWindow  initialization window for flow control
   */
  public InitializeRequestMsg(String baseDn, int serverID, int destination)
  public InitializeRequestMsg(String baseDn, int serverID, int destination,
      int initWindow)
  {
    super(serverID, destination);
    this.baseDn = baseDn;
    this.initWindow = initWindow; // V4
  }
  /**
   * Creates a new InitializeRequestMsg by decoding the provided byte array.
   * @param in A byte array containing the encoded information for the Message
   * @param version The protocol version to use to decode the msg
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded InitializeMessage.
   */
  public InitializeRequestMsg(byte[] in) throws DataFormatException
  public InitializeRequestMsg(byte[] in, short version)
  throws DataFormatException
  {
    super();
    try
@@ -89,7 +95,14 @@
      destination = Integer.valueOf(destinationServerIdString);
      pos += length +1;
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // init window
        length = getNextLength(in, pos);
        String initWindowString = new String(in, pos, length, "UTF-8");
        initWindow = Integer.valueOf(initWindowString);
        pos += length +1;
      }
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
@@ -114,21 +127,40 @@
    }
  }
  // ============
  // Msg encoding
  // ============
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short version)
  {
    try {
      byte[] baseDNBytes = baseDn.getBytes("UTF-8");
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).
      getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      byte[] initWindowBytes = null;
      int length = 1 + baseDNBytes.length + 1 + senderBytes.length + 1
        + destinationBytes.length + 1;
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        initWindowBytes = String.valueOf(initWindow).getBytes("UTF-8");
        length += initWindowBytes.length + 1;
      }
      byte[] resultByteArray = new byte[length];
      // type of the operation
@@ -144,6 +176,12 @@
      // destination
      pos = addByteArray(destinationBytes, resultByteArray, pos);
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // init window
        pos = addByteArray(initWindowBytes, resultByteArray, pos);
      }
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
@@ -159,6 +197,24 @@
  public String toString()
  {
    return "InitializeRequestMessage: baseDn="+baseDn+" senderId="+senderID +
    " destination=" + destination;
    " destination=" + destination + " initWindow=" + initWindow;
  }
  /**
   * Return the initWindow value.
   * @return the initWindow.
   */
  public int getInitWindow()
  {
    return this.initWindow;
  }
  /**
   * Set the initWindow value.
   * @param initWindow The initialization window.
   */
  public void setInitWindow(int initWindow)
  {
    this.initWindow = initWindow;
  }
}
opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -47,31 +47,37 @@
  // is related to its own request.
  private int requestorID;
  private int initWindow;
  /**
   * Creates a InitializeDestinationMessage.
   * Creates a InitializeTargetMsg.
   *
   * @param baseDN The base DN for which the InitializeMessage is created.
   * @param serverID The serverID of the server that sends this message.
   * @param target The destination of this message.
   * @param target2 The server that initiates this export.
   * @param baseDN     The base DN for which the InitializeMessage is created.
   * @param serverID   The serverID of the server that sends this message.
   * @param target     The destination of this message.
   * @param target2    The server that initiates this export.
   * @param entryCount The count of entries that will be sent.
   * @param initWindow the initialization window.
   */
  public InitializeTargetMsg(String baseDN, int serverID,
      int target, int target2, long entryCount)
      int target, int target2, long entryCount, int initWindow)
  {
    super(serverID, target);
    this.requestorID = target2;
    this.baseDN = baseDN;
    this.entryCount = entryCount;
    this.initWindow = initWindow; // V4
  }
  /**
   * Creates an InitializeTargetMsg by decoding the provided byte array.
   * @param in A byte array containing the encoded information for the Message
   * @param version The protocol version to use to decode the msg
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded InitializeMessage.
   */
  public InitializeTargetMsg(byte[] in) throws DataFormatException
  public InitializeTargetMsg(byte[] in, short version)
  throws DataFormatException
  {
    super();
    try
@@ -111,6 +117,14 @@
      entryCount = Long.valueOf(entryCountString);
      pos += length +1;
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // init window
        length = getNextLength(in, pos);
        String initWindowString = new String(in, pos, length, "UTF-8");
        initWindow = Integer.valueOf(initWindowString);
        pos += length +1;
      }
    }
    catch (UnsupportedEncodingException e)
    {
@@ -129,9 +143,12 @@
  /**
   * Get the serverID of the server that initiated the export.
   * Roughly it is the server running the task,
   * - the importer for the Initialize task,
   * - the exporter for the InitializeRemote task.
   * @return the serverID
   */
  public long getRequestorID()
  public long getInitiatorID()
  {
    return this.requestorID;
  }
@@ -143,14 +160,38 @@
   */
  public String getBaseDN()
  {
    return baseDN;
    return this.baseDN;
  }
  /**
   * Get the initializationWindow.
   *
   * @return the initialization window.
   */
  public int getInitWindow()
  {
    return this.initWindow;
  }
  // ============
  // Msg encoding
  // ============
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  public byte[] getBytes(short version)
  throws UnsupportedEncodingException
  {
    try
    {
@@ -159,13 +200,19 @@
      byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
      byte[] byteRequestor = String.valueOf(requestorID).getBytes("UTF-8");
      byte[] byteEntryCount = String.valueOf(entryCount).getBytes("UTF-8");
      byte[] byteInitWindow = null;
      int length = 1 + byteDestination.length + 1
                     + byteDn.length + 1
                     + byteSender.length + 1
                     + byteRequestor.length + 1
                     + byteEntryCount.length + 1;
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        byteInitWindow = String.valueOf(initWindow).getBytes("UTF-8");
        length += byteInitWindow.length + 1;
      }
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
@@ -187,6 +234,12 @@
      /* put the entryCount */
      pos = addByteArray(byteEntryCount, resultByteArray, pos);
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        /* put the initWindow */
        pos = addByteArray(byteInitWindow, resultByteArray, pos);
      }
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
@@ -194,4 +247,13 @@
      return null;
    }
  }
  /**
   * Set the initWindow value.
   * @param initWindow The initialization window.
   */
  public void setInitWindow(int initWindow)
  {
    this.initWindow = initWindow;
  }
}
opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -61,6 +61,7 @@
   *   ReplicationServerDSMsg message.
   *   -> also added of the server URL in RSInfo of TopologyMsg
   * - Introduction of a StopMsg for proper connections ending.
   * - Initialization failover/flow control
   */
  public static final short REPLICATION_PROTOCOL_V4 = 4;
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -78,8 +78,13 @@
  static final byte MSG_TYPE_CT_HEARTBEAT = 33;
  // Added for protocol version 4
  // - New msgs types
  static final byte MSG_TYPE_REPL_SERVER_START_DS = 34;
  static final byte MSG_TYPE_STOP = 35;
  static final byte MSG_TYPE_INITIALIZE_RCV_ACK = 36;
  // - Modified msgs types
  //   EntryMsg, InitializeRequestMsg, InitializeTargetMsg, ErrorMsg
  //   TopologyMsg
  // Adding a new type of message here probably requires to
  // change accordingly generateMsg method below
@@ -192,19 +197,19 @@
        msg = new HeartbeatMsg(buffer);
      break;
      case MSG_TYPE_INITIALIZE_REQUEST:
        msg = new InitializeRequestMsg(buffer);
        msg = new InitializeRequestMsg(buffer, version);
      break;
      case MSG_TYPE_INITIALIZE_TARGET:
        msg = new InitializeTargetMsg(buffer);
        msg = new InitializeTargetMsg(buffer, version);
      break;
      case MSG_TYPE_ENTRY:
        msg = new EntryMsg(buffer);
        msg = new EntryMsg(buffer, version);
      break;
      case MSG_TYPE_DONE:
        msg = new DoneMsg(buffer);
      break;
      case MSG_TYPE_ERROR:
        msg = new ErrorMsg(buffer);
        msg = new ErrorMsg(buffer, version);
      break;
      case MSG_TYPE_RESET_GENERATION_ID:
        msg = new ResetGenerationIdMsg(buffer);
@@ -248,6 +253,9 @@
      case MSG_TYPE_STOP:
        msg = new StopMsg(buffer);
      break;
      case MSG_TYPE_INITIALIZE_RCV_ACK:
        msg = new InitializeRcvAckMsg(buffer);
      break;
      default:
        throw new DataFormatException("received message with unknown type");
    }
opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -81,7 +81,8 @@
  }
  /**
   * Get the destination.
   * Get the destination. The value is a serverId, or ALL_SERVERS dedicated
   * value.
   * @return the destination
   */
  public int getDestination()
@@ -93,7 +94,7 @@
   * Get the server ID of the server that sent this message.
   * @return the server id
   */
  public int getsenderID()
  public int getSenderID()
  {
    return this.senderID;
  }
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -175,7 +175,9 @@
            oStream.write(attr.getBytes("UTF-8"));
            oStream.write(0);
          }
          oStream.write(dsInfo.getProtocolVersion());
        }
      }
      // Put number of following RS info entries
@@ -302,6 +304,7 @@
        }
        Set<String> attrs = new HashSet<String>();
        short protocolVersion = -1;
        if (version>=ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          byte nAttrs = in[pos++];
@@ -317,12 +320,15 @@
            pos += length + 1;
            nRead++;
          }
          /* Read Protocol version */
          protocolVersion = Short.valueOf(in[pos++]);
        }
        /* Now create DSInfo and store it in list */
        DSInfo dsInfo = new DSInfo(dsId, rsId, generationId, status,
          assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs);
          assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs,
          protocolVersion);
        dsList.add(dsInfo);
        nDsInfo--;
opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -664,7 +664,7 @@
  {
    DSInfo dsInfo = new DSInfo(serverId, replicationServerId, generationId,
      status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls,
      eclIncludes);
      eclIncludes, protocolVersion);
    return dsInfo;
  }
opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
@@ -87,6 +87,8 @@
  private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
  // DS safe data level (relevant if assured mode is safe data)
  private byte safeDataLevel = (byte) -1;
  // The prococol version
  private short protocolVersion = -1;
  private Set<String> eclInclude = new HashSet<String>();
@@ -107,11 +109,13 @@
   * @param assuredMode The assured mode of the remote DS
   * @param safeDataLevel The safe data level of the remote DS
   * @param eclInclude The list of entry attributes to be added to the ECL.
   * @param protocolVersion The protocol version supported by the remote DS.
   */
  public LightweightServerHandler(ReplicationServerHandler replServerHandler,
    int replicationServerId, int serverId, long generationId, byte groupId,
    ServerStatus status, List<String> refUrls, boolean assuredFlag,
    AssuredMode assuredMode, byte safeDataLevel, Set<String> eclInclude)
    AssuredMode assuredMode, byte safeDataLevel, Set<String> eclInclude,
    short protocolVersion)
  {
    super("Server Handler");
    this.replServerHandler = replServerHandler;
@@ -126,6 +130,7 @@
    this.assuredMode = assuredMode;
    this.safeDataLevel = safeDataLevel;
    this.eclInclude = eclInclude;
    this.protocolVersion = protocolVersion;
    if (debugEnabled())
      TRACER.debugInfo(
@@ -144,7 +149,7 @@
  {
    DSInfo dsInfo = new DSInfo(serverId, replicationServerId, generationId,
      status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls,
      eclInclude);
      eclInclude, protocolVersion);
    return dsInfo;
  }
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -70,6 +70,7 @@
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.InitializeRcvAckMsg;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolVersion;
@@ -1591,11 +1592,11 @@
   */
  public void process(RoutableMsg msg, ServerHandler senderHandler)
  {
    // Test the message for which a ReplicationServer is expected
    // to be the destination
    if (!(msg instanceof InitializeRequestMsg) &&
        !(msg instanceof InitializeTargetMsg) &&
        !(msg instanceof InitializeRcvAckMsg) &&
        !(msg instanceof EntryMsg) &&
        !(msg instanceof DoneMsg) &&
        (msg.getDestination() == this.replicationServer.getServerId()))
@@ -1616,7 +1617,7 @@
          // Monitoring information requested by a DS
          MonitorMsg monitorMsg =
            createGlobalTopologyMonitorMsg(
                msg.getDestination(), msg.getsenderID(), false);
                msg.getDestination(), msg.getSenderID(), false);
           if (monitorMsg != null)
          {
@@ -1634,7 +1635,7 @@
          // Monitoring information requested by a RS
          MonitorMsg monitorMsg =
            createLocalTopologyMonitorMsg(msg.getDestination(),
            msg.getsenderID());
            msg.getSenderID());
          if (monitorMsg != null)
          {
@@ -1668,7 +1669,7 @@
            NOTE_ERR_ROUTING_TO_SERVER.get(msg.getClass().getCanonicalName()));
        mb1.append("serverID:" + msg.getDestination());
        ErrorMsg errMsg = new ErrorMsg(
          msg.getsenderID(), mb1.toMessage());
          msg.getSenderID(), mb1.toMessage());
        try
        {
          senderHandler.send(errMsg);
@@ -1687,15 +1688,15 @@
    if (servers.isEmpty())
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
      mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
          this.baseDn, Integer.toString(msg.getDestination())));
      mb.append(" In Replication Server=" +
        this.replicationServer.getMonitorInstanceName());
      mb.append(" domain =" + this.baseDn);
      mb.append(" unroutable message =" + msg.toString());
      mb.append(" routing table is empty");
      mb.append(" unroutable message =" + msg.getClass().getSimpleName());
      mb.append(" Details:routing table is empty");
      ErrorMsg errMsg = new ErrorMsg(
        this.replicationServer.getServerId(),
        msg.getsenderID(),
        msg.getSenderID(),
        mb.toMessage());
      logError(mb.toMessage());
      try
@@ -1728,18 +1729,14 @@
           * to its destination server.
           * Send back an error to the originator of the message.
           */
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_ERROR_SENDING_MSG.get(this.toString()));
          mb.append(stackTraceToSingleLineString(ioe));
          mb.append(" ");
          mb.append(msg.getClass().getCanonicalName());
          logError(mb.toMessage());
          MessageBuilder mb1 = new MessageBuilder();
          mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
          mb1.append("serverID:" + msg.getDestination());
          mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
              this.baseDn, Integer.toString(msg.getDestination())));
          mb1.append(" unroutable message =" + msg.getClass().getSimpleName());
          mb1.append(" Details: " + ioe.getLocalizedMessage());
          ErrorMsg errMsg = new ErrorMsg(
            msg.getsenderID(), mb1.toMessage());
            msg.getSenderID(), mb1.toMessage());
          logError(mb1.toMessage());
          try
          {
            senderHandler.send(errMsg);
@@ -2729,7 +2726,7 @@
          // This is a response for an earlier request whose computing is
          // already complete.
          logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
              Integer.toString(msg.getsenderID())));
              Integer.toString(msg.getSenderID())));
          return;
        }
        // Here is the RS state : list <serverID, lastChangeNumber>
@@ -2738,7 +2735,7 @@
        wrkMonitorData.setMaxCNs(replServerState);
        // store the remote RS states.
        wrkMonitorData.setRSState(msg.getsenderID(), replServerState);
        wrkMonitorData.setRSState(msg.getSenderID(), replServerState);
        // Store the remote LDAP servers states
        Iterator<Integer> lsidIterator = msg.ldapIterator();
@@ -2789,7 +2786,7 @@
            TRACER.debugInfo(
              "In " + this +
              " baseDn=" + baseDn +
              " Processed msg from " + msg.getsenderID() +
              " Processed msg from " + msg.getSenderID() +
              " New monitor data: " + wrkMonitorData.toString());
        }
      }
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -689,7 +689,8 @@
            dsInfo.getGroupId(), dsInfo.getStatus(), dsInfo.getRefUrls(),
            dsInfo.isAssured(), dsInfo.getAssuredMode(),
            dsInfo.getSafeDataLevel(),
            dsInfo.getEclIncludes());
            dsInfo.getEclIncludes(),
            dsInfo.getProtocolVersion());
        lsh.startHandler();
        remoteDirectoryServers.put(lsh.getServerId(), lsh);
      }
@@ -843,20 +844,6 @@
  }
  /**
   * Sends a message containing a generationId to a peer server.
   * The peer is expected to be a replication server.
   *
   * @param  msg         The GenerationIdMessage message to be sent.
   * @throws IOException When it occurs while sending the message,
   *
   */
  public void forwardReplicationMsg(ReplicationMsg msg)
    throws IOException
  {
    session.publish(msg);
  }
  /**
   * Receives a topology msg.
   * @param topoMsg The message received.
   * @throws DirectoryException when it occurs.
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -49,6 +49,9 @@
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.EntryMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.HeartbeatThread;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.ProtocolSession;
@@ -952,7 +955,7 @@
    if (debugEnabled())
      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + this +
          " processes received msg:\n" + msg);
          " processes routable msg received:" + msg);
    replicationServerDomain.process(msg, this);
  }
@@ -1012,8 +1015,11 @@
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + this +
          " publishes message:\n" + msg);
    // Currently only MonitorMsg has to support a backward compatibility
    if (msg instanceof MonitorMsg)
    if ((msg instanceof MonitorMsg) || (msg instanceof ErrorMsg) ||
        (msg instanceof EntryMsg) || (msg instanceof InitializeRequestMsg) ||
        (msg instanceof InitializeTargetMsg))
    {
      session.publish(msg, protocolVersion);
    } else
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
@@ -100,9 +100,7 @@
          ReplicationMsg msg = session.receive();
          if (debugEnabled())
          {
            TRACER.debugInfo("In " + getName() + " receives " + msg);
          }
          if (msg instanceof AckMsg)
          {
@@ -187,6 +185,11 @@
            InitializeRequestMsg initializeMsg =
              (InitializeRequestMsg) msg;
            handler.process(initializeMsg);
          } else if (msg instanceof InitializeRcvAckMsg)
          {
            InitializeRcvAckMsg initializeRcvAckMsg =
              (InitializeRcvAckMsg) msg;
            handler.process(initializeRcvAckMsg);
          } else if (msg instanceof InitializeTargetMsg)
          {
            InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -316,6 +316,15 @@
  }
  /**
   * Set the generation id - for test purpose.
   * @param generationID The generation id
   */
  public void setGenerationID(long generationID)
  {
    this.generationID = generationID;
  }
  /**
   * Gets the server url of the RS we are connected to.
   * @return The server url of the RS we are connected to
   */
@@ -727,6 +736,15 @@
    {
      this.locallyConfigured = locallyConfigured;
    }
    /**
     * Returns a string representation of this object.
     * @return A string representation of this object.
     */
    public String toString()
    {
      return "Url:"+ this.getServerURL() + " ServerId:" + this.serverId;
    }
  }
  private void connect()
@@ -859,7 +877,8 @@
        // Best found, now initialize connection to this one (handshake phase 1)
        if (debugEnabled())
          TRACER.debugInfo(
            "phase 2 : will perform PhaseOneH with the preferred RS.");
            "phase 2 : will perform PhaseOneH with the preferred RS="
              + replicationServerInfo);
        replicationServerInfo = performPhaseOneHandshake(
          replicationServerInfo.getServerURL(), true);
@@ -2225,18 +2244,20 @@
  /**
   * restart the ReplicationBroker.
   * @param infiniteTry the socket which failed
   */
  public void reStart()
  public void reStart(boolean infiniteTry)
  {
    reStart(this.session);
    reStart(this.session, infiniteTry);
  }
  /**
   * Restart the ReplicationServer broker after a failure.
   *
   * @param failingSession the socket which failed
   * @param infiniteTry the socket which failed
   */
  public void reStart(ProtocolSession failingSession)
  public void reStart(ProtocolSession failingSession, boolean infiniteTry)
  {
    if (failingSession != null)
@@ -2268,6 +2289,7 @@
      rsGroupId = (byte) -1;
      rsServerId = -1;
      rsServerUrl = null;
      session = null;
    }
    while (!this.connected && (!this.shutdown))
    {
@@ -2282,6 +2304,8 @@
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
      }
      if ((!connected) && (!infiniteTry))
        break;
      if ((!connected) && (!shutdown))
      {
        try
@@ -2293,6 +2317,11 @@
        }
      }
    }
    if (debugEnabled())
      TRACER.debugInfo(this +
          " end restart : connected=" + connected +
          " with RSid=" + this.getRsServerId() +
          " genid=" + this.generationID);
  }
  /**
@@ -2301,7 +2330,18 @@
   */
  public void publish(ReplicationMsg msg)
  {
    _publish(msg, false);
    _publish(msg, false, true);
  }
  /**
   * Publish a message to the other servers.
   * @param msg            The message to publish.
   * @param retryOnFailure Whether reconnect should automatically be done.
   * @return               Whether publish succeeded.
   */
  public boolean publish(ReplicationMsg msg, boolean retryOnFailure)
  {
    return _publish(msg, false, retryOnFailure);
  }
  /**
@@ -2310,15 +2350,18 @@
   */
  public void publishRecovery(ReplicationMsg msg)
  {
    _publish(msg, true);
    _publish(msg, true, true);
  }
  /**
   * Publish a message to the other servers.
   * @param msg the message to publish
   * @param recoveryMsg the message is a recovery Message
   * @param retryOnFailure whether retry should be done on failure
   * @return whether the message was successfully sent.
   */
  void _publish(ReplicationMsg msg, boolean recoveryMsg)
  boolean _publish(ReplicationMsg msg, boolean recoveryMsg,
      boolean retryOnFailure)
  {
    boolean done = false;
@@ -2338,7 +2381,7 @@
            "message is not possible due to existing connection error.");
        }
        return;
        return false;
      }
      try
@@ -2365,7 +2408,7 @@
        // do it.
        if (!recoveryMsg & connectRequiresRecovery)
        {
          return;
          return false;
        }
        if (msg instanceof UpdateMsg)
@@ -2408,6 +2451,9 @@
        }
      } catch (IOException e)
      {
        if (!retryOnFailure)
          return false;
        // The receive threads should handle reconnection or
        // mark this broker in error. Just retry.
        synchronized (connectPhaseLock)
@@ -2435,6 +2481,7 @@
        }
      }
    }
    return true;
  }
  /**
@@ -2450,7 +2497,7 @@
   */
  public ReplicationMsg receive() throws SocketTimeoutException
  {
    return receive(false);
    return receive(false, true, false);
  }
  /**
@@ -2459,22 +2506,29 @@
   * called in a single thread or protected by a locking mechanism
   * before being called.
   *
   * @return the received message
   * @throws SocketTimeoutException if the timeout set by setSoTimeout
   *         has expired
   * @param allowReconnectionMechanism If true, this allows the reconnection
   * mechanism to disconnect the broker if it detects that it should reconnect
   * to another replication server because of some criteria defined by the
   * algorithm where we choose a suitable replication server.
   * @param reconnectToTheBestRS Whether broker will automatically switch
   *                             to the best suitable RS.
   * @param reconnectOnFailure   Whether broker will automatically reconnect
   *                             on failure.
   * @param returnOnTopoChange   Whether broker should return TopologyMsg
   *                             received.
   * @return the received message
   *
   * @throws SocketTimeoutException if the timeout set by setSoTimeout
   *         has expired
   */
  public ReplicationMsg receive(boolean allowReconnectionMechanism)
  public ReplicationMsg receive(boolean reconnectToTheBestRS,
      boolean reconnectOnFailure, boolean returnOnTopoChange)
    throws SocketTimeoutException
  {
    while (shutdown == false)
    {
      if (!connected)
      if ((reconnectOnFailure) && (!connected))
      {
        reStart(null);
        // infinite try to reconnect
        reStart(null, true);
      }
      ProtocolSession failingSession = session;
@@ -2496,11 +2550,16 @@
        {
          TopologyMsg topoMsg = (TopologyMsg) msg;
          receiveTopo(topoMsg);
          if (allowReconnectionMechanism)
          if (reconnectToTheBestRS)
          {
            // Reset wait time before next computation of best server
            mustRunBestServerCheckingAlgorithm = 0;
          }
          // Caller wants to check what's changed
          if (returnOnTopoChange)
            return msg;
        } else if (msg instanceof StopMsg)
        {
          /*
@@ -2512,7 +2571,7 @@
            Integer.toString(serverId));
          logError(message);
          // Try to find a suitable RS
          this.reStart(failingSession);
          this.reStart(failingSession, true);
        } else if (msg instanceof MonitorMsg)
        {
          // This is the response to a MonitorRequest that was sent earlier or
@@ -2551,7 +2610,7 @@
          // it is still the one we are currently connected to. If not,
          // disconnect properly and let the connection algorithm re-connect to
          // best replication server
          if (allowReconnectionMechanism)
          if (reconnectToTheBestRS)
          {
            mustRunBestServerCheckingAlgorithm++;
            if (mustRunBestServerCheckingAlgorithm == 2)
@@ -2572,9 +2631,10 @@
                  NOTE_NEW_BEST_REPLICATION_SERVER.get(baseDn.toString(),
                  Integer.toString(serverId),
                  Integer.toString(rsServerId),
                  rsServerUrl);
                  rsServerUrl,
                  Integer.toString(bestServerInfo.getServerId()));
                logError(message);
                reStart();
                reStart(null, true);
              }
              // Reset wait time before next computation of best server
@@ -2603,10 +2663,13 @@
              Integer.toString(serverId));
            logError(message);
          }
          this.reStart(failingSession);
          if (reconnectOnFailure)
            reStart(failingSession, true);
          else
            break; // does not seem necessary to explicitely disconnect ..
        }
      }
    }
    } // while !shutdown
    return null;
  }
@@ -2676,11 +2739,10 @@
  public void stop()
  {
    if (debugEnabled())
    {
      debugInfo("ReplicationBroker " + serverId + " is stopping and will" +
        " close the connection to replication server " + rsServerId + " for" +
        " domain " + baseDn);
    }
    stopRSHeartBeatMonitoring();
    stopChangeTimeHeartBeatPublishing();
    replicationServer = "stopped";
@@ -2690,25 +2752,17 @@
    rsServerId = -1;
    rsServerUrl = null;
    if (session != null)
    try
    {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // V4 protocol introduces a StopMsg to properly end communications
        try
        {
          session.publish(new StopMsg());
        } catch (IOException ioe)
        {
          // Anyway, going to close session, so nothing to do
        }
      }
      try
      {
        session.close();
      } catch (IOException e)
      {
      }
      session.close();
    } catch (Exception e)
    {
      // Anyway, going to close session, so nothing to do
    }
  }
@@ -2979,6 +3033,9 @@
   */
  public void receiveTopo(TopologyMsg topoMsg)
  {
    if (debugEnabled())
      TRACER.debugInfo(this + " receive TopologyMsg=" + topoMsg);
    // Store new DS list
    dsList = topoMsg.getDsList();
@@ -3100,4 +3157,14 @@
  {
    connectRequiresRecovery = b;
  }
  /**
   * Returns whether the broker is shutting down.
   * @return whether the broker is shutting down.
   */
  public boolean shuttingDown()
  {
    return shutdown;
  }
}
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -72,8 +72,9 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.task.Task;
import org.opends.server.loggers.debug.DebugTracer;
@@ -91,11 +92,13 @@
import org.opends.server.replication.protocol.HeartbeatMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.InitializeRcvAckMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
@@ -284,6 +287,15 @@
  private Map<Integer, Integer> assuredSdServerTimeoutUpdates =
    new HashMap<Integer,Integer>();
  /**
   * Window size used during initialization .. between
   * - the initializer/exporter DS that listens/waits acknowledges and that
   *   slows down data msg publishing based on the slowest server
   * - and each initialized/importer DS that publishes acknowledges each
   *   WINDOW/2 data msg received.
   */
  protected int initWindow = 100;
  /* Status related monitoring fields */
  // Indicates the date when the status changed. This may be used to indicate
@@ -328,6 +340,28 @@
   *                   to the Replication Domain.
   *                   This identifier should be different for each server that
   *                   is participating to a given Replication Domain.
   * @param initWindow Window used during initialization.
   */
  public ReplicationDomain(String serviceID, int serverID,int initWindow)
  {
    this.serviceID = serviceID;
    this.serverID = serverID;
    this.initWindow = initWindow;
    this.state = new ServerState();
    this.generator = new ChangeNumberGenerator(serverID, state);
    domains.put(serviceID, this);
  }
  /**
   * Creates a ReplicationDomain with the provided parameters.
   *
   * @param serviceID  The identifier of the Replication Domain to which
   *                   this object is participating.
   * @param serverID   The identifier of the server that is participating
   *                   to the Replication Domain.
   *                   This identifier should be different for each server that
   *                   is participating to a given Replication Domain.
   */
  public ReplicationDomain(String serviceID, int serverID)
  {
@@ -557,6 +591,22 @@
  }
  /**
   * Check if a remote replica (DS) is connected to the topology based on
   * the TopologyMsg we received when the remote replica connected or
   * disconnected.
   *
   * @param serverId The provided serverId of the remote replica
   * @return whether the remote replica is connected or not.
   */
  public boolean isRemoteDSConnected(int serverId)
  {
    for (DSInfo remoteDS : getReplicasList())
      if (remoteDS.getDsId() == serverId)
        return true;
    return false;
  }
  /**
   * Gets the States of all the Replicas currently in the
   * Topology.
   * When this method is called, a Monitoring message will be sent
@@ -708,7 +758,8 @@
  /**
   * Receives an update message from the replicationServer.
   * also responsible for updating the list of pending changes
   * The other types of messages are processed in an opaque way for the caller.
   * Also responsible for updating the list of pending changes
   * @return the received message - null if none
   */
  UpdateMsg receive()
@@ -717,11 +768,11 @@
    while (update == null)
    {
      InitializeRequestMsg initMsg = null;
      InitializeRequestMsg initReqMsg = null;
      ReplicationMsg msg;
      try
      {
        msg = broker.receive(true);
        msg = broker.receive(true, true, false);
        if (msg == null)
        {
          // The server is in the shutdown process
@@ -741,54 +792,58 @@
        {
          // Another server requests us to provide entries
          // for a total update
          initMsg = (InitializeRequestMsg)msg;
          initReqMsg = (InitializeRequestMsg)msg;
        }
        else if (msg instanceof InitializeTargetMsg)
        {
          // Another server is exporting its entries to us
          InitializeTargetMsg importMsg = (InitializeTargetMsg) msg;
          InitializeTargetMsg initTargetMsg = (InitializeTargetMsg) msg;
          try
          {
            // This must be done while we are still holding the
            // broker lock because we are now going to receive a
            // bunch of entries from the remote server and we
            // want the import thread to catch them and
            // not the ListenerThread.
            initialize(importMsg);
          }
          catch(DirectoryException de)
          {
            // Returns an error message to notify the sender
            ErrorMsg errorMsg =
              new ErrorMsg(importMsg.getsenderID(),
                  de.getMessageObject());
            MessageBuilder mb = new MessageBuilder();
            mb.append(de.getMessageObject());
            TRACER.debugInfo(Message.toString(mb.toMessage()));
            broker.publish(errorMsg);
            logError(de.getMessageObject());
          }
          // This must be done while we are still holding the
          // broker lock because we are now going to receive a
          // bunch of entries from the remote server and we
          // want the import thread to catch them and
          // not the ListenerThread.
          initialize(initTargetMsg, initTargetMsg.getSenderID());
        }
        else if (msg instanceof ErrorMsg)
        {
          ErrorMsg errorMsg = (ErrorMsg)msg;
          if (ieContext != null)
          {
            // This is an error termination for the 2 following cases :
            // - either during an export
            // - or before an import really started
            //   For example, when we publish a request and the
            //  replicationServer did not find any import source.
            abandonImportExport((ErrorMsg)msg);
            //    For example, when we publish a request and the
            //    replicationServer did not find the import source.
            //
            // A remote error during the import will be received in the
            // receiveEntryBytes() method.
            //
            if (debugEnabled())
              TRACER.debugInfo(
                  "[IE] processErrorMsg:" + this.serverID +
                  " serviceID: " + this.serviceID +
                  " Error Msg received: " + errorMsg);
            if (errorMsg.getCreationTime() > ieContext.startTime)
            {
              // consider only ErrorMsg that relate to the current import/export
              processErrorMsg(errorMsg);
            }
            else
            {
              // Simply log - happen when the ErrorMsg relates to a previous
              // attempt of initialization while we have started a new one
              // on this side.
              logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
            }
          }
          else
          {
            /*
             * Log error message
             */
            ErrorMsg errorMsg = (ErrorMsg)msg;
            logError(ERR_ERROR_MSG_RECEIVED.get(
                errorMsg.getDetails()));
            // Simply log - happen if import/export has been terminated
            // on our side before receiving this ErrorMsg.
            logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
          }
        }
        else if (msg instanceof ChangeStatusMsg)
@@ -801,6 +856,15 @@
          update = (UpdateMsg) msg;
          generator.adjust(update.getChangeNumber());
        }
        else if (msg instanceof InitializeRcvAckMsg)
        {
          if (ieContext != null)
          {
            InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg;
            ieContext.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck());
          }
          // Trash this msg When no input/export is running/should never happen
        }
      }
      catch (SocketTimeoutException e)
      {
@@ -815,10 +879,11 @@
      // entries to the remote can be handled by the other
      // replay thread when they call this method and therefore the
      // broker.receive() method.
      if (initMsg != null)
      if (initReqMsg != null)
      {
        // Do this work in a thread to allow replay thread continue working
        ExportThread exportThread = new ExportThread(initMsg.getsenderID());
        ExportThread exportThread = new ExportThread(
            initReqMsg.getSenderID(), initReqMsg.getInitWindow());
        exportThread.start();
      }
    }
@@ -989,23 +1054,29 @@
   */
  /**
   * This thread is launched when we want to export data to another server that
   * has requested to be initialized with the data of our backend.
   * This thread is launched when we want to export data to another server.
   *
   * When a task is created locally (so this local server is the initiator)
   * of the export (Exemple: dsreplication initialize-all),
   * this thread is NOT used but the task thread is running the export instead).
   */
  private class ExportThread extends DirectoryThread
  {
    // Id of server that will receive updates
    private int target;
    // Id of server that will be initialized
    private int serverToInitialize;
    private int initWindow;
    /**
     * Constructor for the ExportThread.
     *
     * @param i Id of server that will receive updates
     * @param serverToInitialize serverId of server that will receive entries
     */
    public ExportThread(int i)
    public ExportThread(int serverToInitialize, int initWindow)
    {
      super("Export thread " + serverID);
      this.target = i;
      super("Export thread from serverId=" + serverID
          + " to serverId=" + serverToInitialize);
      this.serverToInitialize = serverToInitialize;
      this.initWindow = initWindow;
    }
    /**
@@ -1015,22 +1086,20 @@
    public void run()
    {
      if (debugEnabled())
      {
        TRACER.debugInfo("Export thread starting.");
      }
        TRACER.debugInfo("[IE] starting " + this.getName());
      try
      {
        initializeRemote(target, target, null);
        initializeRemote(serverToInitialize, serverToInitialize, null,
            initWindow);
      } catch (DirectoryException de)
      {
      // An error message has been sent to the peer
      // Nothing more to do locally
        // An error message has been sent to the peer
        // This server is not the initiator of the export so there is
        // nothing more to do locally.
      }
      if (debugEnabled())
      {
        TRACER.debugInfo("Export thread stopping.");
      }
        TRACER.debugInfo("[IE] ending " + this.getName());
    }
  }
@@ -1052,13 +1121,49 @@
    // The count for the entry not yet processed
    long entryLeftCount = 0;
    // The exception raised when any
    // Exception raised during the initialization.
    DirectoryException exception = null;
    // A boolean indicating if the context is related to an
    // import or an export.
    // Whether the context is related to an import or an export.
    boolean importInProgress;
    // Current counter of messages exchanged during the initialization
    int msgCnt = 0;
    // Number of connections lost when we start the initialization.
    // Will help counting connections lost during initialization,
    int initNumLostConnections = 0;
    // Request message sent when this server has the initializeFromRemote task.
    InitializeRequestMsg initReqMsgSent = null;
    // Start time of the initialization process. ErrorMsg timestamped
    // before thi startTime will be ignored.
    long startTime;
    // List fo replicas (DS) connected to the topology when
    // initialization started.
    Set<Integer> startList = new HashSet<Integer>(0);
    // List fo replicas (DS) with a failure (disconnected from the topology)
    // since the initialization started.
    Set<Integer> failureList = new HashSet<Integer>(0);
    // Flow control during initialization
    // - for each remote server, counter of messages received
    private HashMap<Integer, Integer> ackVals = new HashMap<Integer, Integer>();
    // - serverId of the slowest server (the one with the smallest non null
    //   counter)
    private int slowestServerId = -1;
    short exporterProtocolVersion = -1;
    // Window used during this initialization
    int initWindow;
    // Number of attempt already done for this initialization
    short attemptCnt;
    /**
     * Creates a new IEContext.
     *
@@ -1069,19 +1174,21 @@
    public IEContext(boolean importInProgress)
    {
      this.importInProgress = importInProgress;
      this.startTime = System.currentTimeMillis();
      this.attemptCnt = 0;
    }
    /**
     * Initializes the import/export counters with the provider value.
     * @param total Total number of entries to be processed.
     * @param left Remaining number of entries to be processed.
     * @throws DirectoryException if an error occurred.
     */
    public void setCounters(long total, long left)
    private void initializeCounters(long total)
      throws DirectoryException
    {
      entryCount = total;
      entryLeftCount = left;
      entryLeftCount = total;
      if (initializeTask != null)
      {
@@ -1193,7 +1300,42 @@
    {
      this.exception = exception;
    }
  }
    /**
     * Set the id of the EntryMsg acknowledged from a receiver (importer)server.
     * (updated via the listener thread)
     * @param serverId serverId of the acknowledger/receiver/importer server.
     * @param numAck   id of the message received.
     */
    public void setAckVal(int serverId, int numAck)
    {
      if (debugEnabled())
        TRACER.debugInfo("[IE] setAckVal[" + serverId + "]=" + numAck);
      this.ackVals.put(serverId, numAck);
      // Recompute the server with the minAck returned,means the slowest server.
      slowestServerId = serverId;
      for (Integer sid : ieContext.ackVals.keySet())
        if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId))
          slowestServerId = sid;
    }
    /**
     * Returns the serverId of the server that acknowledged the smallest
     * EntryMsg id.
     * @return serverId of the server with latest acknowledge.
     *                  0 when no ack has been received yet.
     */
    public int getSlowestServer()
    {
      if (debugEnabled())
        TRACER.debugInfo("[IE] getSlowestServer" + slowestServerId
            + " " + this.ackVals.get(slowestServerId));
      return this.slowestServerId;
    }
}
  /**
   * Verifies that the given string represents a valid source
   * from which this server can be initialized.
@@ -1260,34 +1402,10 @@
  public void initializeRemote(int target, Task initTask)
  throws DirectoryException
  {
    initializeRemote(target, serverID, initTask);
    if (target == RoutableMsg.ALL_SERVERS)
    {
      // Check for the status of all remote servers to check if they
      // are all finished with the import.
      boolean done = true;
      do
      {
        done = true;
        for (DSInfo dsi : getReplicasList())
        {
          if (dsi.getStatus() == ServerStatus.FULL_UPDATE_STATUS)
          {
            done = false;
            try
            {
              Thread.sleep(100);
            } catch (InterruptedException e)
            {
              // just loop again.
            }
            break;
          }
        }
      }
      while (!done);
    }
    initializeRemote(target, this.serverID, initTask, this.initWindow);
  }
  /**
@@ -1295,76 +1413,332 @@
   * specified by the target argument when this initialization specifying the
   * server that requests the initialization.
   *
   * @param target The target that should be initialized.
   * @param target2 The server that initiated the export.
   * @param initTask The task that triggers this initialization and that should
   *  be updated with its progress.
   * @param serverToInitialize The target server that should be initialized.
   * @param serverRunningTheTask The server that initiated the export. It can
   * be the serverID of this server, or the serverID of a remote server.
   * @param initTask The task in this server that triggers this initialization
   * and that should be updated with its progress. Null when the export is done
   * following a request coming from a remote server (task is remote).
   * @param initWindow The value of the initialization window for flow control
   * between the importer and the exporter.
   *
   * @exception DirectoryException When an error occurs.
   * @exception DirectoryException When an error occurs. No exception raised
   * means success.
   */
  protected void initializeRemote(int target, int target2,
    Task initTask) throws DirectoryException
  protected void initializeRemote(int serverToInitialize,
      int serverRunningTheTask, Task initTask, int initWindow)
  throws DirectoryException
  {
    Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
        Integer.toString(serverID),
      serviceID,
      Integer.toString(target2));
    logError(msg);
    DirectoryException exportRootException = null;
    boolean contextAcquired = false;
    boolean contextAcquired=false;
    // Acquire and initialize the export context
    acquireIEContext(false);
    contextAcquired = true;
    ieContext.exportTarget = target;
    if (initTask != null)
    Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
        Integer.toString(serverID), Long.toString(countEntries()), serviceID,
        Integer.toString(serverToInitialize));
    logError(msg);
    // We manage the list of servers to initialize in order :
    // - to test at the end that all expected servers have reconnected
    //   after their import and with the right genId
    // - to update the task with the server(s) where this test failed
    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
      for (DSInfo dsi : getReplicasList())
        ieContext.startList.add(dsi.getDsId());
    else
      ieContext.startList.add(serverToInitialize);
    // We manage the list of servers with which a flow control can be enabled
    for (DSInfo dsi : getReplicasList())
    {
      ieContext.initializeTask = initTask;
      if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        ieContext.setAckVal(dsi.getDsId(), 0);
    }
    // The number of entries to be exported is the number of entries under
    // the base DN entry and the base entry itself.
    long entryCount = this.countEntries();
    ieContext.setCounters(entryCount, entryCount);
    // Send start message to the peer
    InitializeTargetMsg initializeMessage = new InitializeTargetMsg(
        serviceID, serverID, target, target2, entryCount);
    broker.publish(initializeMessage);
    try
    // loop for the case where the exporter is the initiator
    int attempt = 0;
    boolean done = false;
    while ((!done) && (++attempt<2)) // attempt loop
    {
      exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
      try
      {
        ieContext.exportTarget = serverToInitialize;
        if (initTask != null)
          ieContext.initializeTask = initTask;
        ieContext.initializeCounters(this.countEntries());
        ieContext.msgCnt = 0;
        ieContext.initNumLostConnections = broker.getNumLostConnections();
        ieContext.initWindow = initWindow;
      // Notify the peer of the success
      DoneMsg doneMsg = new DoneMsg(serverID,
          initializeMessage.getDestination());
      broker.publish(doneMsg);
        // Send start message to the peer
        InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
            serviceID, serverID, serverToInitialize, serverRunningTheTask,
            ieContext.entryCount, initWindow);
        broker.publish(initTargetMsg);
        // Wait for all servers to be ok
        waitForRemoteStartOfInit();
        // Servers that left in the list are those for which we could not test
        // that they have been successfully initialized.
        if (!ieContext.failureList.isEmpty())
        {
          throw new DirectoryException(
              ResultCode.OTHER,
              ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(
                  ieContext.failureList.toString()));
        }
        exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
        // Notify the peer of the success
        DoneMsg doneMsg = new DoneMsg(serverID, initTargetMsg.getDestination());
        broker.publish(doneMsg);
      }
      catch(DirectoryException exportException)
      {
        // Give priority to the first exception raised - stored in the context
        if (ieContext.exception != null)
          exportRootException = ieContext.exception;
        else
          exportRootException = exportException;
      }
      if (debugEnabled())
        TRACER.debugInfo(
           "[IE] In " + this.monitor.getMonitorInstanceName()
           + " export ends with "
           + " connected=" + broker.isConnected()
           + " exportRootException=" + exportRootException);
      if (exportRootException != null)
      {
        try
        {
          // Handling the errors during export
          // Note: we could have lost the connection and another thread
          //       the listener one) has already managed to reconnect.
          //       So we MUST rely on the test broker.isConnected()
          //       ONLY to do 'wait to be reconnected by another thread'
          //       (if not yet reconnected already).
          if (!broker.isConnected())
          {
            // We are still disconnected, so we wait for the listener thread
            // to reconnect - wait 10s
            if (debugEnabled())
              TRACER.debugInfo(
                "[IE] Exporter wait for reconnection by the listener thread");
            int att=0;
            while ((!broker.shuttingDown()) &&
                (!broker.isConnected())&& (++att<100))
              try { Thread.sleep(100); } catch(Exception e){}
          }
          if ((initTask != null) && broker.isConnected() &&
              (serverToInitialize != RoutableMsg.ALL_SERVERS))
          {
            // NewAttempt case : In the case where
            // - it's not an InitializeAll
            // - AND the previous export attempt failed
            // - AND we are (now) connected
            // - and we own the task and this task is not an InitializeAll
            // Let's :
            // - sleep to let time to the other peer to reconnect if needed
            // - and launch another attempt
            try { Thread.sleep(1000); } catch(Exception e){}
            logError(NOTE_RESENDING_INIT_TARGET.get((exportRootException!=null?
                exportRootException.getLocalizedMessage():"")));
            continue;
          }
          ErrorMsg errorMsg =
              new ErrorMsg(serverToInitialize,
                  exportRootException.getMessageObject());
          broker.publish(errorMsg);
        }
        catch(Exception e)
        {
          // Ignore the failure raised while proceeding the root failure
        }
      }
      // We are always done for this export ...
      // ... except in the NewAttempt case (see above)
      done = true;
    } // attempt loop
    // Wait for all servers to be ok, and build the failure list
    waitForRemoteEndOfInit();
    // Servers that left in the list are those for which we could not test
    // that they have been successfully initialized.
    if (!ieContext.failureList.isEmpty())
    {
      if (exportRootException == null)
        exportRootException = new DirectoryException(ResultCode.OTHER,
          ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(
              Long.toString(getGenerationID()),
              ieContext.failureList.toString()));
    }
    if (contextAcquired)
      releaseIEContext();
    }
    catch(DirectoryException de)
    {
      // Notify the peer of the failure
      ErrorMsg errorMsg =
        new ErrorMsg(target,
                         de.getMessageObject());
      broker.publish(errorMsg);
      if (contextAcquired)
        releaseIEContext();
      throw(de);
    }
    msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
        Integer.toString(serverID),
      serviceID,
      Integer.toString(target2));
        serviceID,
        Integer.toString(serverToInitialize),
      (exportRootException!=null?exportRootException.getLocalizedMessage():""));
    logError(msg);
    if (exportRootException != null)
    {
      throw(exportRootException);
    }
  }
  /*
   * For all remote servers in tht start list,
   * - wait it has finished the import and present the expected generationID
   * - build the failureList
   */
  private void waitForRemoteStartOfInit()
  {
    int waitResultAttempt = 0;
    Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(0);
    for (Integer sid : ieContext.startList)
      replicasWeAreWaitingFor.add(sid);
    if (debugEnabled())
      TRACER.debugInfo(
      "[IE] wait for start replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
    boolean done = true;
    do
    {
      done = true;
      for (DSInfo dsi : getReplicasList())
      {
        if (debugEnabled())
          TRACER.debugInfo(
            "[IE] wait for start dsid " + dsi.getDsId()
            + " " + dsi.getStatus()
            + " " + dsi.getGenerationId()
            + " " + this.getGenerationID());
        if (ieContext.startList.contains(dsi.getDsId()))
        {
          if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS)
          {
            // this one is still not doing the Full Update ... retry later
            done = false;
            try
            { Thread.sleep(100); } catch (InterruptedException e) {}
            waitResultAttempt++;
            break;
          }
          else
          {
            // this one is ok
            replicasWeAreWaitingFor.remove(dsi.getDsId());
          }
        }
      }
    }
    while ((!done) && (waitResultAttempt<1200) // 2mn
        && (!broker.shuttingDown()));
    // Add to the failure list the servers that were here at start time but
    // that never ended with the right generationId.
    for (Integer sid : replicasWeAreWaitingFor.toArray(new Integer[0]))
      ieContext.failureList.add(sid);
    if (debugEnabled())
      TRACER.debugInfo(
        "[IE] wait for start ends with " + ieContext.failureList);
  }
  /*
   * For all remote servers in tht start list,
   * - wait it has finished the import and present the expected generationID
   * - build the failureList
   */
  private void waitForRemoteEndOfInit()
  {
    int waitResultAttempt = 0;
    Set<Integer> replicasWeAreWaitingFor =  new HashSet<Integer>(0);
    for (Integer sid : ieContext.startList)
      replicasWeAreWaitingFor.add(sid);
    if (debugEnabled())
      TRACER.debugInfo(
        "[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
    // In case some new servers appear during the init, we want them to be
    // considered in the processing of sorting the successfully initialized
    // and the others
    for (DSInfo dsi : getReplicasList())
      replicasWeAreWaitingFor.add(dsi.getDsId());
    boolean done = true;
    do
    {
      done = true;
      for (DSInfo dsi : getReplicasList())
      {
        if (debugEnabled())
          TRACER.debugInfo(
            "[IE] wait for end dsid " + dsi.getDsId()
            + " " + dsi.getStatus()
            + " " + dsi.getGenerationId()
            + " " + this.getGenerationID());
        if (!ieContext.failureList.contains(dsi.getDsId()))
        {
          if (dsi.getStatus() == ServerStatus.FULL_UPDATE_STATUS)
          {
            // this one is still doing the Full Update ... retry later
            done = false;
            try
            { Thread.sleep(1000); } catch (InterruptedException e) {} // 1s
            waitResultAttempt++;
            break;
          }
          else
          {
            // this one is done with the Full Update
            if (dsi.getGenerationId() == this.getGenerationID())
            {
              // and with the expected generationId
              replicasWeAreWaitingFor.remove(dsi.getDsId());
            }
          }
        }
      }
    }
    while ((!done) && (!broker.shuttingDown())); // infinite wait
    // Add to the failure list the servers that were here at start time but
    // that never ended with the right generationId.
    for (Integer sid : replicasWeAreWaitingFor.toArray(new Integer[0]))
      ieContext.failureList.add(sid);
    if (debugEnabled())
      TRACER.debugInfo(
        "[IE] wait for end ends with " + ieContext.failureList);
  }
  /**
@@ -1398,33 +1772,42 @@
  }
  /**
   * Processes an error message received while an import/export is
   * on going.
   * Processes an error message received while an export is
   * on going, or an import will start.
   *
   * @param errorMsg The error message received.
   */
  void abandonImportExport(ErrorMsg errorMsg)
  private void processErrorMsg(ErrorMsg errorMsg)
  {
    // FIXME TBD Treat the case where the error happens while entries
    // are being exported
    if (debugEnabled())
      TRACER.debugVerbose(
          " abandonImportExport:" + this.serverID +
          " serviceID: " + this.serviceID +
          " Error Msg received: " + errorMsg);
    if (ieContext != null)
    {
      ieContext.setException(new DirectoryException(ResultCode.OTHER,
        errorMsg.getDetails()));
      if (ieContext.initializeTask instanceof InitializeTask)
      if (ieContext.exportTarget != RoutableMsg.ALL_SERVERS)
      {
        // Update the task that initiated the import
        ((InitializeTask)ieContext.initializeTask).
        updateTaskCompletionState(ieContext.getException());
        // The ErrorMsg is received while we have started an initialization
        if (ieContext.getException() == null)
          ieContext.setException(new DirectoryException(ResultCode.OTHER,
              errorMsg.getDetails()));
        releaseIEContext();
        /*
         * This can happen :
         * - on the first InitReqMsg sent when source in not known for example
         * - on the next attempt when source crashed and did not reconnect
         *   even after the nextInitAttemptDelay
         * During the import, the ErrorMsg will be received by receiveEntryBytes
         */
        if (ieContext.initializeTask instanceof InitializeTask)
        {
          // Update the task that initiated the import
          ((InitializeTask)ieContext.initializeTask).
          updateTaskCompletionState(ieContext.getException());
          releaseIEContext();
        }
      }
      else
      {
        // When we are the exporter in the case of initializeAll
        // exporting must not be stopped on the first error.
      }
    }
  }
@@ -1442,24 +1825,72 @@
    {
      try
      {
        msg = broker.receive();
        // In the context of the total update, we don't want any automatic
        // re-connection done transparently by the broker because of a better
        // RS or because of a connection failure.
        // We want to be notified of topology change in order to track a
        // potential disconnection of the exporter.
        msg = broker.receive(false, false, true);
        if (debugEnabled())
          TRACER.debugVerbose(
              " sid:" + serverID +
              " base DN:" + serviceID +
              " Import EntryBytes received " + msg);
          TRACER.debugInfo(
              "[IE] In " + this.monitor.getMonitorInstanceName() +
            ", receiveEntryBytes " + msg);
        if (msg == null)
        {
          // The server is in the shutdown process
          return null;
          if (broker.shuttingDown())
          {
            // The server is in the shutdown process
            return null;
          }
          else
          {
            // Handle connection issues
            if (ieContext.getException() == null)
              ieContext.setException(new DirectoryException(
                  ResultCode.OTHER,
                  ERR_INIT_RS_DISCONNECTION_DURING_IMPORT.get(
                      broker.getReplicationServer())));
            return null;
          }
        }
        // Check good sequentiality of msg received
        if (msg instanceof EntryMsg)
        {
          EntryMsg entryMsg = (EntryMsg)msg;
          byte[] entryBytes = entryMsg.getEntryBytes();
          ieContext.updateCounters(countEntryLimits(entryBytes));
          if (ieContext.exporterProtocolVersion >=
            ProtocolVersion.REPLICATION_PROTOCOL_V4)
          {
            // check the msgCnt of the msg received to check sequenciality
            if (++ieContext.msgCnt != entryMsg.getMsgId())
            {
              if (ieContext.getException() == null)
                ieContext.setException(new DirectoryException(ResultCode.OTHER,
                    ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get(
                        String.valueOf(ieContext.msgCnt),
                        String.valueOf(entryMsg.getMsgId()))));
              return null;
            }
            // send the ack of flow control mgmt
            if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0)
            {
              InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
                  this.serverID,
                  entryMsg.getSenderID(),
                  ieContext.msgCnt);
              broker.publish(amsg, false);
              if (debugEnabled())
                TRACER.debugInfo(
                    "[IE] In " + this.monitor.getMonitorInstanceName() +
                    ", publish InitializeRcvAckMsg" + amsg);
            }
          }
          return entryBytes;
        }
        else if (msg instanceof DoneMsg)
@@ -1474,22 +1905,43 @@
          // This is an error termination during the import
          // The error is stored and the import is ended
          // by returning null
          ErrorMsg errorMsg = (ErrorMsg)msg;
          ieContext.setException(new DirectoryException(ResultCode.OTHER,
            errorMsg.getDetails()));
          return null;
          if (ieContext.getException() == null)
          {
            ErrorMsg errMsg = (ErrorMsg)msg;
            if (errMsg.getCreationTime() > ieContext.startTime)
            {
              ieContext.setException(
                  new DirectoryException(ResultCode.OTHER,errMsg.getDetails()));
              return null;
            }
          }
        }
        else
        {
          // Other messages received during an import are trashed
          // Other messages received during an import are trashed except
          // the topologyMsg.
          if ((msg instanceof TopologyMsg) &&
              (!this.isRemoteDSConnected(ieContext.importSource)))
          {
            Message errMsg =
              Message.raw(Category.SYNC, Severity.NOTICE,
                  ERR_INIT_EXPORTER_DISCONNECTION.get(
                      this.serviceID,
                      Integer.toString(this.serverID),
                      Integer.toString(ieContext.importSource)));
            if (ieContext.getException()==null)
              ieContext.setException(new DirectoryException(ResultCode.OTHER,
                errMsg));
            return null;
          }
        }
      }
      catch(Exception e)
      {
        // TODO: i18n
        ieContext.setException(new DirectoryException(ResultCode.OTHER,
          Message.raw("received an unexpected message type" +
          e.getLocalizedMessage())));
        if (ieContext.getException() == null)
          ieContext.setException(new DirectoryException(ResultCode.OTHER,
            ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
      }
    }
  }
@@ -1540,27 +1992,108 @@
   *
   * @throws IOException when an error occurred.
   */
  void exportLDIFEntry(byte[] lDIFEntry, int pos, int length) throws IOException
  public void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
  throws IOException
  {
    // If an error was raised - like receiving an ErrorMsg
    // we just let down the export.
    if (ieContext.getException() != null)
    if (debugEnabled())
      TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" + lDIFEntry);
    // build the message
    EntryMsg entryMessage = new EntryMsg(
        serverID,ieContext.getExportTarget(), lDIFEntry, pos, length,
        ++ieContext.msgCnt);
    // Waiting the slowest loop
    while (!broker.shuttingDown())
    {
      IOException ioe = new IOException(ieContext.getException().getMessage());
      ieContext = null;
      throw ioe;
      // If an error was raised - like receiving an ErrorMsg from a remote
      // server that have been stored by the listener thread in the ieContext,
      // we just abandon the export by throwing an exception.
      if (ieContext.getException() != null)
        throw(new IOException(ieContext.getException().getMessage()));
      int slowestServerId = ieContext.getSlowestServer();
      if (!isRemoteDSConnected(slowestServerId))
      {
        ieContext.setException(new DirectoryException(ResultCode.OTHER,
            ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(
                Integer.toString(ieContext.getSlowestServer()))));
        // .. and abandon the export by throwing an exception.
        IOException ioe =
          new IOException("IOException with nested DirectoryException");
        ioe.initCause(ieContext.getException());
        throw ioe;
      }
      int ourLastExportedCnt = ieContext.msgCnt;
      int slowestCnt = ieContext.ackVals.get(slowestServerId);
      if (debugEnabled())
        TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting " +
            " our=" + ourLastExportedCnt + " slowest=" + slowestCnt);
      if ((ourLastExportedCnt - slowestCnt) > ieContext.initWindow)
      {
        if (debugEnabled())
          TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting");
        // our export is too far beyond the slowest importer - let's wait
        try { Thread.sleep(100); } catch(Exception e) {}
        // process any connection error
        if ((broker.hasConnectionError())||
            (broker.getNumLostConnections()!= ieContext.initNumLostConnections))
        {
          // publish failed - store the error in the ieContext ...
          DirectoryException de = new DirectoryException(ResultCode.OTHER,
              ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
                  Integer.toString(broker.getRsServerId())));
          if (ieContext.getException() == null)
            ieContext.setException(de);
          // .. and abandon the export by throwing an exception.
          throw new IOException(de.getMessage());
        }
      }
      else
      {
        if (debugEnabled())
          TRACER.debugInfo("[IE] slowest got to us => stop waiting");
        break;
      }
    } // Waiting the slowest loop
    if (debugEnabled())
      TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry=" + lDIFEntry);
    // publish the message
    boolean sent = broker.publish(entryMessage, false);
    // process any publish error
    if (((!sent)||
        (broker.hasConnectionError()))||
        (broker.getNumLostConnections() != ieContext.initNumLostConnections))
    {
      // publish failed - store the error in the ieContext ...
      DirectoryException de = new DirectoryException(ResultCode.OTHER,
          ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
              Integer.toString(broker.getRsServerId())));
      if (ieContext.getException() == null)
        ieContext.setException(de);
      // .. and abandon the export by throwing an exception.
      throw new IOException(de.getMessage());
    }
    EntryMsg entryMessage = new EntryMsg(
        serverID,ieContext.getExportTarget(), lDIFEntry, pos, length);
    broker.publish(entryMessage);
    // publish succeeded
    try
    {
      ieContext.updateCounters(countEntryLimits(lDIFEntry, pos, length));
    }
    catch (DirectoryException de)
    {
      // store the error in the ieContext ...
      if (ieContext.getException() == null)
        ieContext.setException(de);
      // .. and abandon the export by throwing an exception.
      throw new IOException(de.getMessage());
    }
  }
@@ -1614,127 +2147,285 @@
  }
  /**
   * Initializes this domain from another source server.
   * Initializes asynchronously this domain from a remote source server.
   * Before returning from this call, for the provided task :
   * - the progressing counters are updated during the initialization using
   *   setTotal() and setLeft().
   * - the end of the initialization using updateTaskCompletionState().
   * <p>
   * When this method is called, a request for initialization will
   * be sent to the source server asking for initialization.
   * When this method is called, a request for initialization is sent to the
   * remote source server requesting initialization.
   * <p>
   * The {@link #exportBackend(OutputStream)} will therefore be called
   * on the source server, and the {@link #importBackend(InputStream)}
   * will be called on his server.
   * <p>
   * The InputStream and OutpuStream given as a parameter to those
   * methods will be connected through the replication protocol.
   *
   * @param source   The server-id of the source from which to initialize.
   *                 The source can be discovered using the
   *                 {@link #getReplicasList()} method.
   *
   * @param initTask The task that launched the initialization
   *                 and should be updated of its progress.
   *
   * @throws DirectoryException If it was not possible to publish the
   *                            Initialization message to the Topology.
   *                            The task state is updated.
   */
  public void initializeFromRemote(int source, Task initTask)
  throws DirectoryException
  {
    Message errMsg = null;
    if (debugEnabled())
      TRACER.debugInfo("Entering initializeFromRemote");
      TRACER.debugInfo("[IE] Entering initializeFromRemote for " + this);
    if (!broker.isConnected())
    {
      if (initTask instanceof InitializeTask)
      {
        InitializeTask task = (InitializeTask) initTask;
        task.updateTaskCompletionState(
            new DirectoryException(
                ResultCode.OTHER, ERR_INITIALIZATION_FAILED_NOCONN.get(
                    getServiceID())));
      }
      return;
      errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getServiceID());
    }
    acquireIEContext(true);
    ieContext.initializeTask = initTask;
    // We must not test here whether the remote source is connected to
    // the topology by testing if it stands in the replicas list since.
    // In the case of a re-attempt of initialization, the listener thread is
    // running this method directly coming from initailize() method and did
    // not processed any topology message in between the failure and the
    // new attempt.
    try
    {
      // We must immediatly acquire a context to store the task inside
      // The context will be used when we (the listener thread) will receive
      // the InitializeTargetMsg, process the import, and at the end
      // update the task.
    InitializeRequestMsg initializeMsg = new InitializeRequestMsg(
        serviceID, serverID, source);
      acquireIEContext(true);  //test and set if no import already in progress
      ieContext.initializeTask = initTask;
      ieContext.attemptCnt = 0;
      ieContext.initReqMsgSent = new InitializeRequestMsg(
          serviceID, serverID, source, this.initWindow);
    // Publish Init request msg
    broker.publish(initializeMsg);
      // Publish Init request msg
      broker.publish(ieContext.initReqMsgSent);
    // .. we expect to receive entries or err after that
      // The normal success processing is now to receive InitTargetMsg then
      // entries from the remote server.
      // The error cases are :
      // - either local error immediatly caught below
      // - a remote error we will receive as an ErrorMsg
    }
    catch(DirectoryException de)
    {
      errMsg = de.getMessageObject();
    }
    catch(Exception e)
    {
      // Should not happen
      errMsg = Message.raw(Category.SYNC, Severity.NOTICE,
          e.getLocalizedMessage());
      logError(errMsg);
    }
    // When error, update the task and raise the error to the caller
    if (errMsg != null)
    {
      // No need to call here updateTaskCompletionState - will be done
      // by the caller
      releaseIEContext();
      DirectoryException de = new DirectoryException(
          ResultCode.OTHER,
          errMsg);
      throw (de);
    }
  }
  /**
   * Initializes the domain's backend with received entries.
   * @param initializeMessage The message that initiated the import.
   * @exception DirectoryException Thrown when an error occurs.
   * Processes an InitializeTargetMsg received from a remote server
   * meaning processes an initialization from the entries expected to be
   * received now.
   *
   * @param initTargetMsgReceived The message received from the remote server.
   *
   * @param requestorServerId The serverId of the server that requested the
   *                          initialization meaning the server where the
   *                          task has initially been created (this server,
   *                          or the remote server).
   */
  void initialize(InitializeTargetMsg initializeMessage)
  throws DirectoryException
  void initialize(InitializeTargetMsg initTargetMsgReceived,
      int requestorServerId)
  {
    DirectoryException de = null;
    InitializeTask initFromtask = null;
    Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
        Integer.toString(serverID),
      serviceID,
      Long.toString(initializeMessage.getRequestorID()));
    logError(msg);
    if (debugEnabled())
      TRACER.debugInfo("[IE] Entering initialize - domain=" + this);
    // Go into full update status
    setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
    if (initializeMessage.getRequestorID() == serverID)
    {
      // The import responds to a request we did so the IEContext
      // is already acquired
    }
    else
    {
      acquireIEContext(true);
    }
    ieContext.importSource = initializeMessage.getsenderID();
    ieContext.entryLeftCount = initializeMessage.getEntryCount();
    ieContext.setCounters(
        initializeMessage.getEntryCount(),
        initializeMessage.getEntryCount());
    int source = initTargetMsgReceived.getSenderID();
    try
    {
      // Log starting
      Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
          Integer.toString(serverID),
          serviceID,
          Long.toString(initTargetMsgReceived.getInitiatorID()));
      logError(msg);
      // Go into full update status
      setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
      // Acquire an import context if no already done (and initialize).
      if (initTargetMsgReceived.getInitiatorID() == this.serverID)
      {
        // The initTargetMsgReceived received is the answer to a request that
        // we (this server) sent previously. In this case, so the IEContext
        // has been already acquired when the request was published in order
        // to store the task (to be updated with the status at the end).
      }
      else
      {
        // The initTargetMsgReceived is for an import initiated by the remote
        // server.
        // Test and set if no import already in progress
        acquireIEContext(true);
      }
      // Initialize stuff
      ieContext.importSource = source;
      ieContext.initializeCounters(initTargetMsgReceived.getEntryCount());
      ieContext.initWindow = initTargetMsgReceived.getInitWindow();
      // Protocol version is -1 when not known.
      ieContext.exporterProtocolVersion = getProtocolVersion(source);
      initFromtask = (InitializeTask)ieContext.initializeTask;
      // Lauch the import
      importBackend(new ReplInputStream(this));
      broker.reStart();
    }
    catch (DirectoryException e)
    {
      de = e;
      // Store the exception raised. It will be considered if no other exception
      // has been previously stored in  the context
      if (ieContext.getException() == null)
        ieContext.setException(e);
    }
    finally
    {
      if ((ieContext != null)  && (ieContext.getException() != null))
        de = ieContext.getException();
      if (debugEnabled())
        TRACER.debugInfo("[IE] Domain=" + this
          + " ends import with exception=" + ieContext.getException()
          + " connected=" + broker.isConnected());
      // Update the task that initiated the import
      if ((ieContext != null ) && (ieContext.initializeTask != null))
      // It is necessary to restart (reconnect to RS) for different reasons
      //   - when everything went well, reconnect in order to exchange
      //     new state, new generation ID
      //   - when we have connection failure, reconnect to retry a new import
      //     right here, right now
      // we never want retryOnFailure if we fails reconnecting in the restart.
      broker.reStart(false);
      if (ieContext.getException() != null)
      {
        ((InitializeTask)ieContext.initializeTask).
        updateTaskCompletionState(de);
        if (broker.isConnected() && (initFromtask != null)
            && (++ieContext.attemptCnt<2))
        {
          // Worth a new attempt
          // since initFromtask is in this server, connection is ok
          try
          {
            // Wait for the exporter to stabilize - eventually reconnect as
            // well if it was connected to the same RS than the one we lost ...
            Thread.sleep(1000);
            // Restart the whole import protocol exchange by sending again
            // the request
            logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get(
                ieContext.getException().getLocalizedMessage()));
            broker.publish(ieContext.initReqMsgSent);
            ieContext.initializeCounters(0);
            ieContext.exception = null;
            ieContext.msgCnt = 0;
            // Processing of the received initTargetMsgReceived is done
            // let's wait for the next one
            return;
          }
          catch(Exception e)
          {
            // An error occurs when sending a new request for a new import.
            // This error is not stored, prefering to keep the initial one.
            logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get(
              e.getLocalizedMessage(),
              ieContext.getException().getLocalizedMessage()));
          }
        }
      }
      releaseIEContext();
    }
    // Sends up the root error.
    if (de != null)
      // ===================
      // No new attempt case
      if (debugEnabled())
        TRACER.debugInfo("[IE] Domain=" + this
          + " ends initialization with exception=" + ieContext.getException()
          + " connected=" + broker.isConnected()
          + " task=" + initFromtask
          + " attempt=" + ieContext.attemptCnt);
      try
      {
        if (broker.isConnected() && (ieContext.getException() != null))
        {
          // Let's notify the exporter
          ErrorMsg errorMsg = new ErrorMsg(requestorServerId,
              ieContext.getException().getMessageObject());
          broker.publish(errorMsg);
        }
        else // !broker.isConnected()
        {
          // Don't try to reconnect here.
          // The current running thread is the listener thread and will loop on
          // receive() that is expected to manage reconnects attempt.
        }
        // Update the task that initiated the import must be the last thing.
        // Particularly, broker.restart() after import success must be done
        // before some other operations/tasks to be launched,
        // like resetting the generation ID.
        if (initFromtask != null)
        {
          initFromtask.updateTaskCompletionState(ieContext.getException());
        }
      }
      finally
      {
        Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
            Integer.toString(serverID),
            serviceID,
            Long.toString(initTargetMsgReceived.getInitiatorID()),
            (ieContext.getException()!=null?
                ieContext.getException().getLocalizedMessage():""));
        logError(msg);
        releaseIEContext();
      } // finally
    } // finally
  }
  /**
   * Return the protocol version of the DS related to the provided serverid.
   * Returns -1 when the protocol version is not known.
   * @param dsServerId The provided serverid.
   * @return The procotol version.
   */
  short getProtocolVersion(int dsServerId)
  {
    short protocolVersion = -1;
    for (DSInfo dsi : getReplicasList())
    {
      throw de;
      if (dsi.getDsId() == dsServerId)
      {
        protocolVersion = dsi.getProtocolVersion();
        break;
      }
    }
    msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
        Integer.toString(serverID),
      serviceID,
      Long.toString(initializeMessage.getRequestorID()));
    logError(msg);
    return protocolVersion;
  }
  /**
@@ -1887,15 +2578,7 @@
    if (debugEnabled())
      TRACER.debugInfo(
          "Server id " + serverID + " and domain " + serviceID
          + "resetGenerationId" + generationIdNewValue);
    if (!isConnected())
    {
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID);
      throw new DirectoryException(
         resultCode, message);
    }
          + " resetGenerationId " + generationIdNewValue);
    ResetGenerationIdMsg genIdMessage = null;
@@ -1907,6 +2590,16 @@
    {
      genIdMessage = new ResetGenerationIdMsg(generationIdNewValue);
    }
    if (!isConnected())
    {
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID,
          Integer.toString(serverID),
          Long.toString(genIdMessage.getGenerationId()));
      throw new DirectoryException(
         resultCode, message);
    }
    broker.publish(genIdMessage);
    // check that at least one ReplicationServer did change its generation-id
@@ -2410,6 +3103,7 @@
    // Wait for the listener thread to stop
    if (listenerThread != null)
      listenerThread.waitForShutdown();
  }
  /**
opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
@@ -22,11 +22,12 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.tasks;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.messages.Message;
import org.opends.messages.TaskMessages;
import org.opends.server.types.ResultCode;
@@ -35,6 +36,7 @@
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.core.DirectoryServer.getAttributeType;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import java.util.List;
@@ -68,6 +70,13 @@
  /**
   * {@inheritDoc}
   */
  public Message getDisplayName() {
    return TaskMessages.INFO_TASK_INITIALIZE_TARGET_NAME.get();
  }
  /**
   * {@inheritDoc}
   */
  @Override public void initializeTask() throws DirectoryException
  {
    if (TaskState.isDone(getTaskState()))
@@ -117,9 +126,9 @@
  protected TaskState runTask()
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("DebugInfo" + "InitializeTarget Task/runTask ");
    }
      TRACER.debugInfo("[IE] InitializeTargetTask is starting on domain: "+
          domain.getServiceID());
    try
    {
      domain.initializeRemote(target, this);
@@ -128,7 +137,6 @@
    {
      // This log will go to the task log message
      MessageBuilder mb = new MessageBuilder();
      mb.append("Initialize Task stopped by error");
      mb.append(de.getMessageObject());
      logError(mb.toMessage());
opends/src/server/org/opends/server/tasks/InitializeTask.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.tasks;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
@@ -76,7 +76,7 @@
  // completed
  long left = 0;
  private Message initTaskError = null;
  private Message taskCompletionError = null;
  /**
   * {@inheritDoc}
@@ -139,8 +139,8 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("InitializeTask is starting domain: %s source:%d",
                domain.getServiceID(), source);
      TRACER.debugInfo("[IE] InitializeTask is starting on domain: %s "
          + " from source:%d", domain.getServiceID(), source);
    }
    initState = getTaskState();
    try
@@ -163,20 +163,23 @@
      replaceAttributeValue(ATTR_TASK_INITIALIZE_LEFT, String.valueOf(left));
      replaceAttributeValue(
          ATTR_TASK_INITIALIZE_DONE, String.valueOf(total-left));
      // Error raised at completion time
      if (taskCompletionError != null)
        logError(taskCompletionError);
    }
    catch(InterruptedException ie) {}
    catch(DirectoryException de)
    {
      // Error raised at submission time
      logError(de.getMessageObject());
      initState = TaskState.STOPPED_BY_ERROR;
    }
    if (initTaskError != null)
      logError(initTaskError);
    if (debugEnabled())
    {
      TRACER.debugInfo("InitializeTask is ending with state:%s",
      TRACER.debugInfo("[IE] InitializeTask is ending with state:%s",
          initState.toString());
    }
    return initState;
@@ -190,28 +193,22 @@
   */
  public void updateTaskCompletionState(DirectoryException de)
  {
    initState =  TaskState.STOPPED_BY_ERROR;
    try
    {
      if (de != null)
      {
        initTaskError = de.getMessageObject();
      }
      if (de == null)
        initState =  TaskState.COMPLETED_SUCCESSFULLY;
      else
        initState =  TaskState.STOPPED_BY_ERROR;
      if (debugEnabled())
      {
        TRACER.debugInfo("InitializeTask/setState: %s", initState);
      }
        taskCompletionError = de.getMessageObject();
    }
    finally
    {
      // Wake up runTask method waiting for completion
      synchronized (initState)
      {
        initState.notify();
      }
    }
    catch(Exception e)
    {}
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -254,6 +254,7 @@
    ECLAfterChangelogTrim();replicationServer.clearDb();
    // Write changes and read ECL from start
    sleep(500); // Wait for draftCNDb to be purged also
    int ts = ECLCompatWriteReadAllOps(1);
    // Write additional changes and read ECL from a provided draft change number
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -1020,6 +1020,10 @@
      addTask(taskInitRemoteS2, ResultCode.SUCCESS, null);
      // S2 should be re-initialized and have a new valid genId
      // Signal that we just entered the full update status
      broker2.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
      int receivedEntriesNb = this.receiveImport(broker2, server2ID, null);
      debugInfo("broker2 has been initialized from DS with #entries=" + receivedEntriesNb);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication;
@@ -61,10 +61,13 @@
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.protocol.DoneMsg;
import org.opends.server.replication.protocol.EntryMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.HeartbeatThread;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -146,9 +149,11 @@
  boolean emptyOldChanges = true;
  LDAPReplicationDomain replDomain = null;
  int initWindow = 100;
  private void log(String s)
  {
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
    logError(Message.raw(Category.SYNC, Severity.NOTICE,
        "InitOnLineTests/" + s));
    if (debugEnabled())
    {
@@ -183,7 +188,8 @@
    // clear it.
    LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
    updatedEntries = newLDIFEntries();
    // For most tests, a limited number of entries is enough
    updatedEntries = newLDIFEntries(2);
    // Create an internal connection in order to provide operations
    // to DS to populate the db -
@@ -283,6 +289,7 @@
  private void waitTaskCompleted(Entry taskEntry, TaskState expectedState,
      long expectedLeft, long expectedDone)
  {
    log("waitTaskCompleted " + taskEntry.toLDIFString());
    try
    {
      // FIXME - Factorize with TasksTestCase
@@ -398,6 +405,22 @@
      for (String ldifEntry : updatedEntries)
      {
        Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
        addTestEntryToDB(entry);
        // They will be removed at the end of the test
        entryList.addLast(entry.getDN());
      }
      log("addTestEntriesToDB : " + updatedEntries.length + " successfully added to DB");
    }
    catch(Exception e)
    {
      fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  private void addTestEntryToDB(Entry entry)
  {
    try
    {
        AddOperationBasis addOp = new AddOperationBasis(connection,
            InternalClientConnection.nextOperationID(), InternalClientConnection
            .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
@@ -411,18 +434,17 @@
        // They will be removed at the end of the test
        entryList.addLast(entry.getDN());
      }
    }
    catch(Exception e)
    {
      fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
      fail("addTestEntryToDB Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /*
   * Creates entries necessary to the test.
   */
  private String[] newLDIFEntries()
  private String[] newLDIFEntries(int entriesCnt)
  {
    // It is relevant to test ReplLDIFInputStream
    // and ReplLDIFOutputStream with big entries
@@ -430,46 +452,76 @@
    for (int i=0; i<bigAttributeValue.length; i++)
      bigAttributeValue[i] = Integer.toString(i).charAt(0);
    String[] entries =
    {
    String[] entries = new String[entriesCnt + 2];
    String filler = "000000000000000000000000000000000000";
    entries[0] = new String(
        "dn: " + EXAMPLE_DN + "\n"
        + "objectClass: top\n"
        + "objectClass: domain\n"
        + "dc: example\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111111\n"
        + "\n",
        + "\n");
    entries[1] = new String(
          "dn: ou=People," + EXAMPLE_DN + "\n"
        + "objectClass: top\n"
        + "objectClass: organizationalUnit\n"
        + "ou: People\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
        + "\n",
          "dn: cn=Fiona Jensen,ou=people," + EXAMPLE_DN + "\n"
        + "\n");
    for (int i=0; i<entriesCnt; i++)
    {
      String useri="0000"+i;
      entries[i+2] = new String(
          "dn: cn="+useri+",ou=people," + EXAMPLE_DN + "\n"
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
        + "objectclass: inetOrgPerson\n"
        + "cn: Fiona Jensen\n"
        + "sn: Jensen\n"
        + "uid: fiona\n"
        + "cn: "+useri+"_cn"+"\n"
        + "sn: "+useri+"_sn"+"\n"
        + "uid: "+useri+"_uid"+"\n"
        + "telephonenumber:: "+ Base64.encode(
            new String(bigAttributeValue).getBytes())+"\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111113\n"
        + "\n",
          "dn: cn=Robert Langman,ou=people," + EXAMPLE_DN + "\n"
        + "entryUUID: 21111111-1111-1111-1111-"+useri+
        filler.substring(0, 12-useri.length())+"\n"
        + "\n");
    };
    return entries;
  }
  /*
   * Creates entries necessary to the test.
   */
  private String newLDIFEntry(int entryCnt)
  {
    // It is relevant to test ReplLDIFInputStream
    // and ReplLDIFOutputStream with big entries
    char bigAttributeValue[] = new char[30240];
    for (int i=0; i<bigAttributeValue.length; i++)
      bigAttributeValue[i] = Integer.toString(i).charAt(0);
    String filler = "000000000000000000000000000000000000";
    String useri="0000"+entryCnt;
    return  new String(
        "dn: cn="+useri+",ou=people," + EXAMPLE_DN + "\n"
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
        + "objectclass: inetOrgPerson\n"
        + "cn: Robert Langman\n"
        + "sn: Langman\n"
        + "uid: robert\n"
        + "telephonenumber: "+ new String(bigAttributeValue)+"\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111114\n"
        + "\n"
        };
        + "cn: "+useri+"_cn"+"\n"
        + "sn: "+useri+"_sn"+"\n"
        + "uid: "+useri+"_uid"+"\n"
        + "telephonenumber:: "+ Base64.encode(
            new String(bigAttributeValue).getBytes())+"\n"
            + "entryUUID: 21111111-1111-1111-1111-"+useri+
            filler.substring(0, 12-useri.length())+"\n"
            + "\n");
    return entries;
  }
  /**
@@ -488,15 +540,16 @@
      RoutableMsg initTargetMessage =
        new InitializeTargetMsg(
          EXAMPLE_DN, server2ID, destinationServerID, requestorID,
          updatedEntries.length);
          updatedEntries.length, initWindow);
      broker.publish(initTargetMessage);
      int cnt = 0;
      for (String entry : updatedEntries)
      {
        log("Broker will pusblish 1 entry: bytes:"+ entry.length());
        log("Broker will publish 1 entry: bytes:"+ entry.length());
        EntryMsg entryMsg = new EntryMsg(senderID, destinationServerID,
            entry.getBytes());
            entry.getBytes(), ++cnt);
        broker.publish(entryMsg);
      }
@@ -559,7 +612,7 @@
      }
      catch (SocketTimeoutException e)
      {
        log("SocketTimeoutException while waiting fro entries" +
        log("SocketTimeoutException while waiting for entries" +
            stackTraceToSingleLineString(e));
      }
      catch(Exception e)
@@ -571,6 +624,11 @@
    assertTrue(entriesReceived == updatedEntries.length,
        " Received entries("+entriesReceived +
        ") == Expected entries("+updatedEntries.length+")");
    broker.setGenerationID(EMPTY_DN_GENID);
    broker.reStart(true);
    try { Thread.sleep(500); } catch(Exception e) {}
  }
  /**
@@ -643,6 +701,11 @@
   */
  private void connectServer1ToChangelog(int changelogID)
  {
    connectServer1ToChangelog(changelogID, 0);
  }
  private void connectServer1ToChangelog(int changelogID, int heartbeat)
  {
    // Connect DS to the replicationServer
    try
    {
@@ -651,7 +714,7 @@
      String synchroServerLdif =
        "dn: cn=" + testName + ", cn=domains," + SYNCHRO_PLUGIN_DN + "\n"
      + "objectClass: top\n"
      + "objectC7lass: ds-cfg-synchronization-provider\n"
      + "objectClass: ds-cfg-synchronization-provider\n"
      + "objectClass: ds-cfg-replication-domain\n"
      + "cn: " + testName + "\n"
      + "ds-cfg-base-dn: " + EXAMPLE_DN + "\n"
@@ -659,7 +722,7 @@
      + getChangelogPort(changelogID)+"\n"
      + "ds-cfg-server-id: " + server1ID + "\n"
      + "ds-cfg-receive-status: true\n"
//    + "ds-cfg-heartbeat-interval: 0 ms\n"
      + (heartbeat>0?"ds-cfg-heartbeat-interval: "+heartbeat+" ms\n":"")
      + "ds-cfg-window-size: " + WINDOW_SIZE;
@@ -706,11 +769,19 @@
  /**
   * Tests the import side of the Initialize task
   * Test steps :
   * - create a task 'InitFromS2' in S1
   * - make S2 export its entries
   * - test that S1 has succesfully imported the entries and completed the task.
   *
   * TODO: Error case: make S2 crash/disconnect in the middle of the export
   * and test that, on S1 side, the task ends with an error.
   * State of the backend on S1 partially initialized: ?
   */
  @Test(enabled=true, groups="slow")
  public void initializeImport() throws Exception
  {
    String testCase = "initializeImport";
    String testCase = "initializeImport ";
    log("Starting "+testCase);
@@ -740,8 +811,8 @@
      InitializeRequestMsg initMsg = (InitializeRequestMsg)msg;
      // S2 publishes entries to S1
      makeBrokerPublishEntries(server2, server2ID, initMsg.getsenderID(),
          initMsg.getsenderID());
      makeBrokerPublishEntries(server2, server2ID, initMsg.getSenderID(),
          initMsg.getSenderID());
      // Wait for task (import) completion in S1
      waitTaskCompleted(taskInitFromS2, TaskState.COMPLETED_SUCCESSFULLY,
@@ -757,12 +828,16 @@
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
  /**
   * Tests the export side of the Initialize task
   * Test steps :
   * - add entries in S1, make S2 publish InitRequest
   * - test that S1 has succesfully exported the entries (by receiving them
   *   on S2 side).
   */
  @Test(enabled=true, groups="slow")
  public void initializeExport() throws Exception
@@ -789,20 +864,27 @@
      // Thread.sleep(3000);
      InitializeRequestMsg initMsg = new InitializeRequestMsg(EXAMPLE_DN,
        server2ID, server1ID);
        server2ID, server1ID, 100);
      server2.publish(initMsg);
      // Signal RS we just entered the full update status
      server2.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
      receiveUpdatedEntries(server2, server2ID, updatedEntries);
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
}
  /**
   * Tests the import side of the InitializeTarget task
   * Test steps :
   * - add entries in S1 and create a task 'InitTargetS2' in S1
   * - wait task completed
   * - test that S2 has succesfully received the entries
   */
  @Test(enabled=true, groups="slow")
  public void initializeTargetExport() throws Exception
@@ -832,21 +914,33 @@
      // Launch in S1 the task that will initialize S2
      addTask(taskInitTargetS2, ResultCode.SUCCESS, null);
      // Wait for task completion
      waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
      // Signal RS we just entered the full update status
      server2.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
      // Tests that entries have been received by S2
      receiveUpdatedEntries(server2, server2ID, updatedEntries);
      // Wait for task completion
      waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
  /**
   * Tests the import side of the InitializeTarget task
   * Test steps :
   * - addEntries in S1, create a task 'InitAll' in S1
   * - wait task completed on S1
   * - test that S2 and S3 have succesfully imported the entries.
   *
   * TODO: Error case: make S1 crash in the middle of the export and test that
   * the task ends with an error. State of the backend on both S2 and S3: ?
   *
   * TODO: Error case: make S2 crash in the middle of the import and test what??
   */
  @Test(enabled=true, groups="slow")
  public void initializeTargetExportAll() throws Exception
@@ -879,17 +973,22 @@
      // Launch in S1 the task that will initialize S2
      addTask(taskInitTargetAll, ResultCode.SUCCESS, null);
      // Wait for task completion
      waitTaskState(taskInitTargetAll, TaskState.COMPLETED_SUCCESSFULLY, null);
      // Tests that entries have been received by S2
      // Signal RS we just entered the full update status
      server2.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
      server3.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
      receiveUpdatedEntries(server2, server2ID, updatedEntries);
      receiveUpdatedEntries(server3, server3ID, updatedEntries);
      // Wait for task completion
      waitTaskState(taskInitTargetAll, TaskState.COMPLETED_SUCCESSFULLY, null);
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
@@ -921,13 +1020,20 @@
      // wait until the replication domain has expected generationID
      // this should indicate that the import occured correctly.
      for (int count = 0; count < 100; count++)
      for (int count = 0; count < 120; count++)
      {
        if (replDomain.getGenerationID() == 56869)
        if (replDomain.getGenerationID() == 53235)
          break;
        Thread.sleep(200);
        log(testCase + " genId=" + replDomain.getGenerationID());
        Thread.sleep(1000);
      }
      if (replDomain.getGenerationID() != 53235)
      {
        fail(testCase + " Import success waited longer than expected \n" +
            TestCaseUtils.threadStacksToString());
      }
      // Test that entries have been imported in S1
      testEntriesInDb();
@@ -938,7 +1044,7 @@
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
@@ -993,7 +1099,7 @@
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
@@ -1052,7 +1158,8 @@
          "ds-task-initialize-domain-dn: " + baseDn,
          "ds-task-initialize-replica-server-id: -3");
      addTask(taskInit, ResultCode.OTHER,
          ERR_INVALID_IMPORT_SOURCE.get());
          ERR_INVALID_IMPORT_SOURCE.get(baseDn.toNormalizedString(),
              Integer.toString(server1ID),"-3",""));
      // Scope containing a serverID absent from the domain
      // createTask(taskInitTargetS2);
@@ -1064,7 +1171,7 @@
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
@@ -1156,23 +1263,24 @@
    // TODO Test ReplicationServerDomain.getDestinationServers method.
      log("Successfully ending " + testCase);
    } finally
    {
      if (broker2 != null)
        broker2.stop();
      if (broker3 != null)
        broker3.stop();
      afterTest();
      afterTest(testCase);
    }
  }
  @Test(enabled=true, groups="slow")
  public void initializeTargetExportMultiSS() throws Exception
  {
    String testCase = "initializeTargetExportMultiSS";
    try
    {
      String testCase = "initializeTargetExportMultiSS";
      log("Starting " + testCase);
      // Create 2 changelogs
@@ -1190,6 +1298,7 @@
      // connected to changelog2
      if (server2 == null)
      {
        log(testCase + " Will connect server 2 to " + changelog2ID);
        server2 = openReplicationSession(DN.decode(EXAMPLE_DN),
            server2ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
      }
@@ -1197,19 +1306,40 @@
     // Thread.sleep(1000);
      // Launch in S1 the task that will initialize S2
      log(testCase + " add task " + Thread.currentThread());
      addTask(taskInitTargetS2, ResultCode.SUCCESS, null);
      // Wait for task completion
      waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
      log(testCase + " " + server2.getServerId() + " wait target " + Thread.currentThread());
      ReplicationMsg msgrcv;
      do
      {
        msgrcv = server2.receive();
        log(testCase + " " + server2.getServerId() + " receives " + msgrcv);
      }
      while(!(msgrcv instanceof InitializeTargetMsg));
      assertTrue(msgrcv instanceof InitializeTargetMsg, msgrcv.getClass().getCanonicalName());
      // Signal RS we just entered the full update status
      log(testCase + " change status");
      server2.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
      // Tests that entries have been received by S2
      log(testCase + " receive entries");
      receiveUpdatedEntries(server2, server2ID, updatedEntries);
      // Wait for task completion
      log(testCase + " wait task completed");
      waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      log(testCase + e.getLocalizedMessage());
    }
    finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
@@ -1263,17 +1393,34 @@
      // S3 sends init request
      log(testCase + " server 3 Will send reqinit to " + server1ID);
      InitializeRequestMsg initMsg =
        new InitializeRequestMsg(EXAMPLE_DN, server3ID, server1ID);
        new InitializeRequestMsg(EXAMPLE_DN, server3ID, server1ID, 100);
      server3.publish(initMsg);
      // S3 should receive target, entries & done
      log(testCase + " Wait for InitializeTargetMsg");
      ReplicationMsg msgrcv = null;
      do
      {
        msgrcv = server3.receive();
        log(testCase + " receives  "+ msgrcv);
      }
      while (!(msgrcv instanceof InitializeTargetMsg));
      assertTrue(msgrcv instanceof InitializeTargetMsg,msgrcv.getClass().getCanonicalName() +
      msgrcv);
      // Signal RS we just entered the full update status
      server3.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
      log(testCase + " Will verify server 3 has received expected entries");
      receiveUpdatedEntries(server3, server3ID, updatedEntries);
      log(testCase + " Will verify no more msgs");
      while (true)
      {
        try
        {
          log(testCase + " Will receive");
          ReplicationMsg msg = server3.receive();
          fail("Receive unexpected message " + msg);
        } catch (SocketTimeoutException e)
@@ -1282,11 +1429,11 @@
          break;
        }
      }
      log("Successfully ending " + testCase);
    } finally
    }
    finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
@@ -1318,7 +1465,8 @@
      addTask(taskInit, ResultCode.SUCCESS, null);
      waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
        ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
        ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
            baseDn.toString(), "20"));
      // Test 2
      taskInit = TestCaseUtils.makeEntry(
@@ -1331,7 +1479,9 @@
        "ds-task-initialize-domain-dn: " + baseDn,
        "ds-task-initialize-replica-server-id: " + server1ID);
      addTask(taskInit, ResultCode.OTHER, ERR_INVALID_IMPORT_SOURCE.get());
      addTask(taskInit, ResultCode.OTHER, ERR_INVALID_IMPORT_SOURCE.get(
          baseDn.toNormalizedString(),
          Integer.toString(server1ID),"20",""));
      if (replDomain != null)
      {
@@ -1342,7 +1492,7 @@
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
@@ -1376,7 +1526,7 @@
      addTask(taskInit, ResultCode.SUCCESS, null);
      waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
        ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
        ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(baseDn.toString(), "0"));
      if (replDomain != null)
      {
@@ -1387,7 +1537,7 @@
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
@@ -1483,14 +1633,15 @@
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
  /**
   * Disconnect broker and remove entries from the local DB
   * @param testCase The name of the test case.
   */
  protected void afterTest()
  protected void afterTest(String testCase)
  {
    // Check that the domain has completed the import/export task.
@@ -1564,6 +1715,7 @@
    {
      replServerPort[i] = 0;
    }
    log("Successfully cleaned " + testCase);
  }
    /**
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2007-2009 Sun Microsystems, Inc.
 *      Copyright 2007-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
@@ -389,7 +389,18 @@
  {
    return this.eclIncludes;
  }
  public long getInitializationHeartbeatInterval()
  {
    return 180;
  }
  
  public int getInitializationWindowSize()
  {
    return 100;
  }
  public boolean hasExternalChangelogDomain() { return true; }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
@@ -104,6 +104,7 @@
  private static final int EXCLUDE_FRAC_MODE = 0;
  private static final int INCLUDE_FRAC_MODE = 1;
  int initWindow = 100;
  private ChangeNumberGenerator gen = null;
  // The tracer object for the debug logger
@@ -688,7 +689,7 @@
      long heartbeatInterval,
      long generationId) throws ConfigException
    {
      super(serviceID, serverID);
      super(serviceID, serverID, 100);
      generationID = generationId;
      startPublishService(replicationServers, window, heartbeatInterval, 500);
      startListenService();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java
@@ -127,6 +127,7 @@
  private void endTest()
  {
    debugInfo("endTest");
    for (int i = 0 ; i < NDS; i++)
    {
      if (rd[i] != null)
@@ -155,6 +156,7 @@
      }
      rsPort[i] = -1;
    }
    debugInfo("endTest done");
  }
  /**
@@ -345,7 +347,7 @@
   * and check the DSs are correctly spread across the RSs
   * @throws Exception If a problem occurred
   */
  @Test
  @Test (enabled=true)
  public void testSpreadLoad() throws Exception
  {
    String testCase = "testSpreadLoad";
@@ -1051,7 +1053,7 @@
   * disconnections/reconnections after the very first connections.
   * @throws Exception If a problem occurred
   */
  @Test (groups = "slow")
  @Test (enabled=true,groups = "slow")
  public void testNoYoyo1() throws Exception
  {
    String testCase = "testNoYoyo1";
@@ -1141,7 +1143,7 @@
   * disconnections/reconnections after the very first connections.
   * @throws Exception If a problem occurred
   */
  @Test (groups = "slow")
  @Test (enabled=true, groups = "slow")
  public void testNoYoyo2() throws Exception
  {
    String testCase = "testNoYoyo2";
@@ -1234,7 +1236,7 @@
   * disconnections/reconnections after the very first connections.
   * @throws Exception If a problem occurred
   */
  @Test (groups = "slow")
  @Test (enabled=true, groups = "slow")
  public void testNoYoyo3() throws Exception
  {
    String testCase = "testNoYoyo3";
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
@@ -99,6 +99,7 @@
  private ReplicationServer rs1 = null;
  // The tracer object for the debug logger
  private static final DebugTracer TRACER = getTracer();
  private int initWindow = 100;
  private void debugInfo(String s)
  {
@@ -934,7 +935,7 @@
      initTargetMsg =
          new InitializeTargetMsg(EXAMPLE_DN, serverId, destId,
          serverId, nEntries);
          serverId, nEntries, initWindow);
      rb.publish(initTargetMsg);
@@ -944,14 +945,14 @@
        + "objectClass: domain\n"
        + "dc: example\n"
        + "entryUUID: 11111111-1111-1111-1111-111111111111\n\n";
      EntryMsg entryMsg = new EntryMsg(serverId, destId, topEntry.getBytes());
      EntryMsg entryMsg = new EntryMsg(serverId, destId, topEntry.getBytes(), 1);
      rb.publish(entryMsg);
    }
    private EntryMsg createNextEntryMsg()
    {
      String userEntryUUID = "11111111-1111-1111-1111-111111111111";
      long curId = userId++;
      long curId = ++userId;
      String userdn = "uid=full_update_user" + curId + "," + EXAMPLE_DN;
      String entryWithUUIDldif = "dn: " + userdn + "\n" + "objectClass: top\n" +
        "objectClass: person\n" + "objectClass: organizationalPerson\n" +
@@ -972,7 +973,7 @@
      // Create an entry message
      EntryMsg entryMsg = new EntryMsg(serverId, destId,
        entryWithUUIDldif.getBytes());
        entryWithUUIDldif.getBytes(), (int)userId);
      return entryMsg;
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -867,6 +867,7 @@
    int assuredSdLevel = -100;
    SortedSet<String> refUrls = null;
    Set<String> eclIncludes = new HashSet<String>();
    short protocolVersion = 4;
    switch (dsId)
      {
@@ -921,7 +922,7 @@
    }
    return new DSInfo(dsId, rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, status, assuredFlag, assMode,
       (byte)assuredSdLevel, groupId, urls, eclIncludes);
       (byte)assuredSdLevel, groupId, urls, eclIncludes, protocolVersion);
  }
  /**
@@ -1122,8 +1123,9 @@
     byte groupId = rd.getGroupId();
     List<String> refUrls = rd.getRefUrls();
     Set<String> eclInclude = rd.getEclInclude();
     short protocolVersion = 4;
     DSInfo dsInfo = new DSInfo(dsId, rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, status, assuredFlag, assuredMode,
       safeDataLevel, groupId, refUrls, eclInclude);
       safeDataLevel, groupId, refUrls, eclInclude, protocolVersion);
     dsList.add(dsInfo);
     TopoView dsTopoView = new TopoView(dsList, rd.getRsList());
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -35,6 +35,7 @@
import java.util.List;
import java.util.zip.DataFormatException;
import org.opends.messages.Message;
import org.opends.server.core.AddOperationBasis;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyDNOperationBasis;
@@ -63,6 +64,7 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.opends.server.replication.protocol.OperationContext.SYNCHROCONTEXT;
import static org.opends.messages.ReplicationMessages.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -553,7 +555,7 @@
   * Test that various combinations of ModifyMsg encoding and decoding
   * using protocol V1 and VLAST are working.
   */
  @Test(dataProvider = "createModifyData")
  @Test(enabled=false,dataProvider = "createModifyData")
  public void modifyMsgTestVLASTV1(ChangeNumber changeNumber,
                               String rawdn, List<Modification> mods,
                               boolean isAssured, AssuredMode assuredMode,
@@ -1067,16 +1069,16 @@
    urls4.add("ldaps://host:port/dc=foobar2??sub?(sn=Another Entry 2)");
    DSInfo dsInfo1 = new DSInfo(13, 26, (long)154631, ServerStatus.FULL_UPDATE_STATUS,
      false, AssuredMode.SAFE_DATA_MODE, (byte)12, (byte)132, urls1, new HashSet<String>());
      false, AssuredMode.SAFE_DATA_MODE, (byte)12, (byte)132, urls1, new HashSet<String>(), (short)-1);
    DSInfo dsInfo2 = new DSInfo(-436, 493, (long)-227896, ServerStatus.DEGRADED_STATUS,
      true, AssuredMode.SAFE_READ_MODE, (byte)-7, (byte)-265, urls2, new HashSet<String>());
      true, AssuredMode.SAFE_READ_MODE, (byte)-7, (byte)-265, urls2, new HashSet<String>(), (short)-1);
    DSInfo dsInfo3 = new DSInfo(2436, 591, (long)0, ServerStatus.NORMAL_STATUS,
      false, AssuredMode.SAFE_READ_MODE, (byte)17, (byte)0, urls3, new HashSet<String>());
      false, AssuredMode.SAFE_READ_MODE, (byte)17, (byte)0, urls3, new HashSet<String>(), (short)-1);
    DSInfo dsInfo4 = new DSInfo(415, 146, (long)0, ServerStatus.BAD_GEN_ID_STATUS,
      true, AssuredMode.SAFE_DATA_MODE, (byte)2, (byte)15, urls4, new HashSet<String>());
      true, AssuredMode.SAFE_DATA_MODE, (byte)2, (byte)15, urls4, new HashSet<String>(), (short)-1);
    List<DSInfo> dsList1 = new ArrayList<DSInfo>();
    dsList1.add(dsInfo1);
@@ -1143,4 +1145,295 @@
    BigInteger bi = new BigInteger(msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V3));
    assertEquals(bi.toString(16), oldPdu);
  }
}
  @DataProvider(name="createEntryMsgData")
  public Object [][] createEntryMsgData() throws Exception
  {
    int sid = 1;
    int dest = 2;
    byte[] entryBytes = "12".getBytes();
    int pos = 0;
    int length = 2;
    int msgid = 14;
    Object[] set1 = new Object[] {sid, dest, entryBytes, pos, length, msgid};
    return new Object [][] { set1};
  }
  /**
   * Test that various combinations of EntryMsg encoding and decoding
   * using protocol VLAST and V3 are working.
   */
  @Test(enabled=true, dataProvider="createEntryMsgData")
  public void entryMsgTestVLASTV3(int sid, int dest, byte[] entryBytes,
      int pos, int length, int msgId) throws Exception
  {
    // Create VLAST message
    EntryMsg msg = new EntryMsg(sid, dest, entryBytes, pos, length, msgId);
    // Serialize in V3
    byte[] v3MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V3);
    // Un-serialize V3 message
    EntryMsg newMsg = new EntryMsg(v3MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V3);
    // Check fields common to both versions
    assertEquals(msg.getSenderID(), newMsg.getSenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getEntryBytes(), newMsg.getEntryBytes());
    // Check default value for only post V3 fields
    assertEquals(newMsg.getMsgId(), -1);
    // Set again only post V3 fields
    newMsg.setMsgId(msgId);
    // Serialize in VLAST
    EntryMsg vlastMsg = new EntryMsg(newMsg.getBytes(),REPLICATION_PROTOCOL_VLAST);
    // Check we retrieve original VLAST message (VLAST fields)
    assertEquals(msg.getSenderID(), vlastMsg.getSenderID());
    assertEquals(msg.getDestination(), vlastMsg.getDestination());
    assertEquals(msg.getEntryBytes(), vlastMsg.getEntryBytes());
    assertEquals(msg.getMsgId(), vlastMsg.getMsgId());
  }
  @DataProvider(name="createErrorMsgData")
  public Object [][] createErrorMsgData() throws Exception
  {
    int sender = 1;
    int dest = 2;
    Message message = ERR_UNKNOWN_TYPE.get("toto");
    Object[] set1 = new Object[] {sender, dest, message};
    return new Object [][] { set1};
  }
  /**
   * Test that various combinations of ErrorMsg encoding and decoding
   * using protocol VLAST and V3 are working.
   */
  @Test(enabled=true, dataProvider="createErrorMsgData")
  public void errorMsgTestVLASTV3(int sender, int dest, Message message)
  throws Exception
  {
    // Create VLAST message
    ErrorMsg msg = new ErrorMsg(sender, dest, message);
    long creatTime = msg.getCreationTime();
    // Serialize in V3
    byte[] v3MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V3);
    // Un-serialize V3 message
    ErrorMsg newMsg = new ErrorMsg(v3MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V3);
    // Check fields common to both versions
    assertEquals(msg.getSenderID(), newMsg.getSenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getMsgID(), newMsg.getMsgID());
    // Set again only post V3 fields
    newMsg.setCreationTime(creatTime);
    // Serialize in VLAST
    ErrorMsg vlastMsg = new ErrorMsg(newMsg.getBytes(),
        REPLICATION_PROTOCOL_VLAST);
    // Check we retrieve original VLAST message (VLAST fields)
    assertEquals(msg.getSenderID(), vlastMsg.getSenderID());
    assertEquals(msg.getDestination(), vlastMsg.getDestination());
    assertEquals(msg.getMsgID(), vlastMsg.getMsgID());
    assertEquals(msg.getCreationTime(), vlastMsg.getCreationTime());
  }
  @DataProvider(name="createInitializationRequestMsgData")
  public Object [][] createInitializationRequestMsgData() throws Exception
  {
    int sender = 1;
    int dest = 2;
    String baseDn = "dc=whatever";
    int initWindow = 22;
    Object[] set1 = new Object[] {sender, dest, baseDn, initWindow };
    return new Object [][] { set1};
  }
  /**
   * Test that various combinations of ErrorMsg encoding and decoding
   * using protocol VLAST and V3 are working.
   */
  @Test(enabled=true, dataProvider="createInitializationRequestMsgData")
  public void initializationRequestMsgTestVLASTV3(int sender, int dest,
      String baseDn, int initWindow)
  throws Exception
  {
    // Create VLAST message
    InitializeRequestMsg msg = new InitializeRequestMsg(baseDn, sender, dest, initWindow);
    // Serialize in V3
    byte[] v3MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V3);
    // Un-serialize V3 message
    InitializeRequestMsg newMsg = new InitializeRequestMsg(v3MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V3);
    // Check fields common to both versions
    assertEquals(msg.getSenderID(), newMsg.getSenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
    // Check default value for only post V3 fields
    assertEquals(newMsg.getInitWindow(), 0);
    // Set again only post V3 fields
    newMsg.setInitWindow(initWindow);
    // Serialize in VLAST
    InitializeRequestMsg vlastMsg = new InitializeRequestMsg(newMsg.getBytes(),
        REPLICATION_PROTOCOL_VLAST);
    // Check we retrieve original VLAST message (VLAST fields)
    assertEquals(msg.getSenderID(), vlastMsg.getSenderID());
    assertEquals(msg.getDestination(), vlastMsg.getDestination());
    assertEquals(msg.getBaseDn(), vlastMsg.getBaseDn());
    assertEquals(msg.getInitWindow(), vlastMsg.getInitWindow());
  }
  @DataProvider(name="createInitializeTargetMsgData")
  public Object [][] createInitializeTargetMsgData() throws Exception
  {
    int sender = 1;
    int dest = 2;
    int initiator = 3;
    String baseDn = "dc=whatever";
    int entryCount = 56;
    int initWindow = 22;
    Object[] set1 = new Object[] {sender, dest, initiator, baseDn,
        entryCount, initWindow };
    return new Object [][] { set1};
  }
  /**
   * Test that various combinations of ErrorMsg encoding and decoding
   * using protocol VLAST and V3 are working.
   */
  @Test(enabled=true, dataProvider="createInitializeTargetMsgData")
  public void initializeTargetMsgTestVLASTV3(int sender, int dest,
      int initiator, String baseDn, int entryCount, int initWindow)
  throws Exception
  {
    // Create VLAST message
    InitializeTargetMsg msg = new InitializeTargetMsg(baseDn, sender, dest,
        initiator, entryCount, initWindow);
    // Serialize in V3
    byte[] v3MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V3);
    // Un-serialize V3 message
    InitializeTargetMsg newMsg = new InitializeTargetMsg(v3MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V3);
    // Check fields common to both versions
    assertEquals(msg.getSenderID(), newMsg.getSenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getBaseDN(), newMsg.getBaseDN());
    assertEquals(msg.getEntryCount(), newMsg.getEntryCount());
    assertEquals(msg.getInitiatorID(), newMsg.getInitiatorID());
    // Check default value for only post V3 fields
    assertEquals(newMsg.getInitWindow(), 0);
    // Set again only post V3 fields
    newMsg.setInitWindow(initWindow);
    // Serialize in VLAST
    InitializeTargetMsg vlastMsg = new InitializeTargetMsg(newMsg.getBytes(),
        REPLICATION_PROTOCOL_VLAST);
    // Check we retrieve original VLAST message (VLAST fields)
    assertEquals(msg.getSenderID(), vlastMsg.getSenderID());
    assertEquals(msg.getDestination(), vlastMsg.getDestination());
    assertEquals(msg.getBaseDN(), vlastMsg.getBaseDN());
    assertEquals(msg.getEntryCount(), vlastMsg.getEntryCount());
    assertEquals(msg.getInitiatorID(), vlastMsg.getInitiatorID());
    assertEquals(msg.getInitWindow(), vlastMsg.getInitWindow());
  }
  @DataProvider(name = "createEntryMsgV3")
  public Object[][] createEntryMsgV3()
  {
    return new Object[][] {
        {"0c32003100646e3a206f753d50656f706c652c64633d6578616d706c652c64633d636f6d0a6f626a656374436c6173733a20746f700a6f626a656374436c6173733a206f7267616e697a6174696f6e616c556e69740a6f753a2050656f706c650a656e747279555549443a2032313131313131312d313131312d313131312d313131312d3131313131313131313131320a0a00",
          1, 2}};
  }
  @Test(dataProvider = "createEntryMsgV3")
  public void entryMsgPDUV3(
      String pduV3, int dest, int sender) throws Exception
  {
    // this msg is changed by V4, so we want to test that V>3 server can
    // build a V>3 version when it receives a V3 PDU from a V3 server.
    EntryMsg msg = new EntryMsg(hexStringToByteArray(pduV3),
        ProtocolVersion.REPLICATION_PROTOCOL_V3);
    assertEquals(msg.getDestination(), dest, "Expected:" + dest);
    assertEquals(msg.getSenderID(), sender, "Expected:" + sender);
    assertEquals(msg.getMsgId(), -1, "Expected:-1");
    // we should test EntryBytes
  }
  @DataProvider(name = "createErrorMsgV3")
  public Object[][] createErrorMsgV3()
  {
    return new Object[][] {
        {"0e380039003135313338383933004f6e207375666669782064633d6578616d706c652c64633d636f6d2c207265706c69636174696f6e2073657276657220392070726573656e7465642067656e65726174696f6e2049443d2d31207768656e2065787065637465642067656e65726174696f6e2049443d343800",
          9, 8, "On suffix dc=example,dc=com, replication server 9 presented generation ID=-1 when expected generation ID=48"}};
  }
  @Test(dataProvider = "createErrorMsgV3")
  public void errorMsgPDUV3(
      String pduV3, int dest, int sender, String errorDetails) throws Exception
  {
    // this msg is changed by V4, so we want to test that V>3 server can
    // build a V>3 version when it receives a V3 PDU from a V3 server.
    ErrorMsg msg = new ErrorMsg(hexStringToByteArray(pduV3),
        ProtocolVersion.REPLICATION_PROTOCOL_V3);
    assertEquals(msg.getDestination(), 9, "Expected:"+9);
    assertEquals(msg.getSenderID(), 8, "Expected:"+8);
    assertTrue(0==msg.getDetails().toString().compareTo(errorDetails));
  }
  @DataProvider(name = "initializeTargetMsgV3")
  public Object[][] createInitializeTargetMsgV3()
  {
    return new Object[][] {
        {"0b320064633d6578616d706c652c64633d636f6d00310032003400",
          "dc=example,dc=com", 2, 1, 4}};
  }
  @Test(dataProvider = "createInitializeTargetMsgV3")
  public void initializeTargetMsgPDUV3(
      String pduV3, String baseDN, int dest, int sender, int entryCount) throws Exception
  {
    // this msg is changed by V4, so we want to test that V>3 server can
    // build a V>3 version when it receives a V3 PDU from a V3 server.
    InitializeTargetMsg msg = new InitializeTargetMsg(hexStringToByteArray(pduV3),
        ProtocolVersion.REPLICATION_PROTOCOL_V3);
    assertEquals(msg.getDestination(), dest);
    assertEquals(msg.getSenderID(), sender);
    assertEquals(msg.getBaseDN().toString(), baseDN);
    assertEquals(msg.getEntryCount(), entryCount);
  }
  @DataProvider(name = "initializeRequestMsgV3")
  public Object[][] createInitializeRequestMsgV3()
  {
    return new Object[][] {
        {"0a64633d6578616d706c652c64633d636f6d0032003100",
          "dc=example,dc=com", 1, 2}};
  }
  @Test(dataProvider = "createInitializeRequestMsgV3")
  public void initializeRequestMsgPDUV3(
      String pduV3, String baseDN, int dest, int sender) throws Exception
  {
    // this msg is changed by V4, so we want to test that V>3 server can
    // build a V>3 version when it receives a V3 PDU from a V3 server.
    InitializeRequestMsg msg = new InitializeRequestMsg(hexStringToByteArray(pduV3),
        ProtocolVersion.REPLICATION_PROTOCOL_V3);
    assertEquals(msg.getDestination(), dest);
    assertEquals(msg.getSenderID(), sender);
    assertEquals(msg.getBaseDn().toString(), baseDN);
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -581,8 +581,8 @@
    AddMsg generatedMsg = (AddMsg) ReplicationMsg.generateMsg(msg
        .getBytes(), ProtocolVersion.getCurrentVersion());
    assertEquals(msg.getBytes(), generatedMsg.getBytes());
    assertEquals(msg.toString(), generatedMsg.toString());
    assertEquals(generatedMsg.getBytes(), msg.getBytes());
    assertEquals(generatedMsg.toString(), msg.toString());
    // Test that generated attributes match original attributes.
    assertEquals(generatedMsg.getParentUid(), msg.getParentUid());
@@ -997,19 +997,19 @@
    Set<String> a4 = new HashSet<String>();
    DSInfo dsInfo1 = new DSInfo(13, 26, (long)154631, ServerStatus.FULL_UPDATE_STATUS,
      false, AssuredMode.SAFE_DATA_MODE, (byte)12, (byte)132, urls1, a1);
      false, AssuredMode.SAFE_DATA_MODE, (byte)12, (byte)132, urls1, a1, (short)1);
    DSInfo dsInfo2 = new DSInfo(-436, 493, (long)-227896, ServerStatus.DEGRADED_STATUS,
      true, AssuredMode.SAFE_READ_MODE, (byte)-7, (byte)-265, urls2, a2);
      true, AssuredMode.SAFE_READ_MODE, (byte)-7, (byte)-265, urls2, a2, (short)2);
    DSInfo dsInfo3 = new DSInfo(2436, 591, (long)0, ServerStatus.NORMAL_STATUS,
      false, AssuredMode.SAFE_READ_MODE, (byte)17, (byte)0, urls3, a3);
      false, AssuredMode.SAFE_READ_MODE, (byte)17, (byte)0, urls3, a3, (short)3);
    DSInfo dsInfo4 = new DSInfo(415, 146, (long)0, ServerStatus.BAD_GEN_ID_STATUS,
      true, AssuredMode.SAFE_DATA_MODE, (byte)2, (byte)15, urls4, a4);
      true, AssuredMode.SAFE_DATA_MODE, (byte)2, (byte)15, urls4, a4, (short)4);
    DSInfo dsInfo5 = new DSInfo(452436, 45591, (long)0, ServerStatus.NORMAL_STATUS,
        false, AssuredMode.SAFE_READ_MODE, (byte)17, (byte)0, urls3, a1);
        false, AssuredMode.SAFE_READ_MODE, (byte)17, (byte)0, urls3, a1, (short)5);
    List<DSInfo> dsList1 = new ArrayList<DSInfo>();
    dsList1.add(dsInfo1);
@@ -1198,7 +1198,7 @@
    MonitorRequestMsg msg = new MonitorRequestMsg(1,2);
    MonitorRequestMsg newMsg = new MonitorRequestMsg(msg.getBytes());
    assertEquals(newMsg.getDestination(), 2);
    assertEquals(newMsg.getsenderID(), 1);
    assertEquals(newMsg.getSenderID(), 1);
  }
  /**
@@ -1287,7 +1287,7 @@
      }
    }
    assertEquals(newMsg.getsenderID(), msg.getsenderID());
    assertEquals(newMsg.getSenderID(), msg.getSenderID());
    assertEquals(newMsg.getDestination(), msg.getDestination());
  }
@@ -1310,9 +1310,9 @@
    int sender = 1;
    int target = 45678;
    byte[] entry = taskInitFromS2.getBytes();
    EntryMsg msg = new EntryMsg(sender, target, entry);
    EntryMsg newMsg = new EntryMsg(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    EntryMsg msg = new EntryMsg(sender, target, entry, 1);
    EntryMsg newMsg = new EntryMsg(msg.getBytes(),ProtocolVersion.getCurrentVersion());
    assertEquals(msg.getSenderID(), newMsg.getSenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getEntryBytes(), newMsg.getEntryBytes());
  }
@@ -1326,9 +1326,9 @@
    int sender = 1;
    int target = 56789;
    InitializeRequestMsg msg = new InitializeRequestMsg(
        TEST_ROOT_DN_STRING, sender, target);
    InitializeRequestMsg newMsg = new InitializeRequestMsg(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
        TEST_ROOT_DN_STRING, sender, target, 100);
    InitializeRequestMsg newMsg = new InitializeRequestMsg(msg.getBytes(),ProtocolVersion.getCurrentVersion());
    assertEquals(msg.getSenderID(), newMsg.getSenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertTrue(msg.getBaseDn().equals(newMsg.getBaseDn()));
  }
@@ -1343,19 +1343,20 @@
    int targetID = 2;
    int requestorID = 3;
    long entryCount = 4;
    int initWindow = 100;
    InitializeTargetMsg msg = new InitializeTargetMsg(
        TEST_ROOT_DN_STRING, senderID, targetID, requestorID, entryCount);
    InitializeTargetMsg newMsg = new InitializeTargetMsg(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
        TEST_ROOT_DN_STRING, senderID, targetID, requestorID, entryCount, initWindow);
    InitializeTargetMsg newMsg = new InitializeTargetMsg(msg.getBytes(),ProtocolVersion.getCurrentVersion());
    assertEquals(msg.getSenderID(), newMsg.getSenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getRequestorID(), newMsg.getRequestorID());
    assertEquals(msg.getInitiatorID(), newMsg.getInitiatorID());
    assertEquals(msg.getEntryCount(), newMsg.getEntryCount());
    assertTrue(msg.getBaseDN().equals(newMsg.getBaseDN())) ;
    assertEquals(senderID, newMsg.getsenderID());
    assertEquals(senderID, newMsg.getSenderID());
    assertEquals(targetID, newMsg.getDestination());
    assertEquals(requestorID, newMsg.getRequestorID());
    assertEquals(requestorID, newMsg.getInitiatorID());
    assertEquals(entryCount, newMsg.getEntryCount());
    assertTrue(TEST_ROOT_DN_STRING.equals(newMsg.getBaseDN())) ;
@@ -1369,7 +1370,7 @@
  {
    DoneMsg msg = new DoneMsg(1, 2);
    DoneMsg newMsg = new DoneMsg(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    assertEquals(msg.getSenderID(), newMsg.getSenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
  }
@@ -1380,8 +1381,8 @@
  public void errorMsgTest() throws Exception
  {
    ErrorMsg msg = new ErrorMsg(1, 2, Message.raw("details"));
    ErrorMsg newMsg = new ErrorMsg(msg.getBytes());
    assertEquals(msg.getsenderID(), newMsg.getsenderID());
    ErrorMsg newMsg = new ErrorMsg(msg.getBytes(),ProtocolVersion.getCurrentVersion());
    assertEquals(msg.getSenderID(), newMsg.getSenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getMsgID(), newMsg.getMsgID());
    assertEquals(msg.getDetails(), newMsg.getDetails());
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.service;
@@ -66,7 +66,7 @@
      long heartbeatInterval,
      BlockingQueue<UpdateMsg> queue) throws ConfigException
  {
    super(serviceID, serverID);
    super(serviceID, serverID, 100);
    startPublishService(replicationServers, window, heartbeatInterval, 500);
    startListenService();
    this.queue = queue;
@@ -82,7 +82,7 @@
      StringBuilder importString,
      int exportedEntryCount) throws ConfigException
  {
    super(serviceID, serverID);
    super(serviceID, serverID, 100);
    startPublishService(replicationServers, window, heartbeatInterval, 500);
    startListenService();
    this.exportString = exportString;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.service;
@@ -62,7 +62,7 @@
      long heartbeatInterval,
      BlockingQueue<UpdateMsg> queue) throws ConfigException
  {
    super(serviceID, serverID);
    super(serviceID, serverID, 100);
    startPublishService(replicationServers, window, heartbeatInterval, 500);
    startListenService();
    this.queue = queue;
@@ -77,7 +77,7 @@
      String exportString,
      StringBuilder importString) throws ConfigException
  {
    super(serviceID, serverID);
    super(serviceID, serverID, 100);
    startPublishService(replicationServers, window, heartbeatInterval, 500);
    startListenService();
    this.exportString = exportString;