mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
02.58.2007 b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965
fix for #1733 & #845 - Initialization of replication
3 files added
41 files modified
4763 ■■■■ changed files
opends/resource/config/config.ldif 1 ●●●● patch | view | raw | blame | history
opends/resource/schema/02-config.ldif 12 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties 51 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/SchemaBackend.java 15 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/config/ConfigConstants.java 24 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/core/SchemaConfigManager.java 8 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/ServerState.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java 13 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java 25 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java 23 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java 113 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java 673 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java 5 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java 42 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/InitializeRequestMessage.java 10 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java 51 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java 4 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java 6 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ResetGenerationId.java 78 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java 17 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/SocketSession.java 10 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/StartMessage.java 34 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DbHandler.java 52 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationCache.java 346 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDB.java 29 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java 481 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 139 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 313 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerReader.java 151 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerWriter.java 29 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/tasks/InitializeTargetTask.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/tasks/InitializeTask.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/tasks/SetGenerationIdTask.java 128 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/types/Schema.java 33 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java 1401 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java 137 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java 14 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java 144 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java 4 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/IsolationTest.java 3 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java 10 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 11 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java 73 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 42 ●●●● patch | view | raw | blame | history
opends/resource/config/config.ldif
@@ -57,6 +57,7 @@
ds-cfg-allowed-task: org.opends.server.tasks.ImportTask
ds-cfg-allowed-task: org.opends.server.tasks.InitializeTargetTask
ds-cfg-allowed-task: org.opends.server.tasks.InitializeTask
ds-cfg-allowed-task: org.opends.server.tasks.SetGenerationIdTask
ds-cfg-allowed-task: org.opends.server.tasks.LeaveLockdownModeTask
ds-cfg-allowed-task: org.opends.server.tasks.RebuildTask
ds-cfg-allowed-task: org.opends.server.tasks.RestoreTask
opends/resource/schema/02-config.ldif
@@ -1515,6 +1515,10 @@
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.450 NAME 'ds-cfg-message-body'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.479
  NAME 'ds-sync-generation-id'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15  USAGE directoryOperation SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.451
  NAME 'ds-task-import-clear-backend'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE
@@ -1559,6 +1563,10 @@
  NAME 'ds-cfg-notification-sender-address'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.480
  NAME 'ds-task-reset-generation-id-domain-base-dn'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.466
  NAME 'ds-cfg-plugin-order-subordinate-modify-dn'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
@@ -2251,6 +2259,10 @@
  MUST ds-task-disconnect-connection-id
  MAY ( ds-task-disconnect-message $ ds-task-disconnect-notify-client )
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.124
  NAME 'ds-task-reset-generation-id' SUP ds-task
  MUST ( ds-task-reset-generation-id-domain-base-dn )
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.121
  NAME 'ds-cfg-regular-expression-identity-mapper' SUP ds-cfg-identity-mapper
  STRUCTURAL MUST ( ds-cfg-match-attribute $ ds-cfg-match-pattern )
opends/src/messages/messages/replication.properties
@@ -62,10 +62,10 @@
MILD_ERR_UNKNOWN_TYPE_7=Unknown operation type : %s
MILD_ERR_OPERATION_NOT_FOUND_IN_PENDING_9=Internal Error : Operation %s \
 change number %s was not found in pending list
MILD_ERR_COULD_NOT_INITIALIZE_DB_10=Changelog failed to start because the \
MILD_ERR_COULD_NOT_INITIALIZE_DB_10=The replication server failed to start because the \
 database %s could not be opened
MILD_ERR_COULD_NOT_READ_DB_11=Changelog failed to start because the database \
 %s could not be read
MILD_ERR_COULD_NOT_READ_DB_11=The replication server failed to start because the database \
 %s could not be read : %s
MILD_ERR_EXCEPTION_REPLAYING_OPERATION_12=An Exception was caught while \
 replaying operation %s : %s
MILD_ERR_NEED_CHANGELOG_PORT_13=The replication server port must be defined
@@ -77,15 +77,15 @@
NOTICE_NO_CHANGELOG_SERVER_LISTENING_17=There is no replication server \
 listening on %s
NOTICE_CHANGELOG_MISSING_CHANGES_18=The replication server %s is missing some \
 changes that this server has already processed
 changes that this server has already processed on suffix %s
NOTICE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER_19=More than one replication \
 server should be configured
SEVERE_ERR_EXCEPTION_STARTING_SESSION_20=Caught exception during initial \
 communication with replication server: %s
NOTICE_EXCEPTION_STARTING_SESSION_20=Caught Exception during initial \
 communication on domain %s with replication server %s : %s
MILD_ERR_CANNOT_RECOVER_CHANGES_21=Error when searching old changes from the \
 database for base DN %s
NOTICE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES_22=Could not find a \
 replication server that has seen all the local changes. Going to replay \
 replication server that has seen all the local changes on suffix %s. Going to replay \
 changes
NOTICE_COULD_NOT_FIND_CHANGELOG_23=Could not connect to any replication \
 server on suffix %s, retrying...
@@ -175,7 +175,7 @@
NOTICE_DISCONNECTED_FROM_CHANGELOG_63=The connection to Replication Server %s \
 has been dropped by the Replication Server
SEVERE_ERR_CHANGELOG_ERROR_SENDING_INFO_64=An unexpected error occurred \
 while sending a Server  Info message to %s. This connection is going to be \
 while sending a Server Info message to %s. This connection is going to be \
 closed and reopened
SEVERE_ERR_CHANGELOG_ERROR_SENDING_ERROR_65=An unexpected error occurred \
 while sending an Error Message to %s. This connection is going to be closed \
@@ -186,11 +186,42 @@
 ChangeNumber %s error %s %s
MILD_ERR_UNKNOWN_ATTRIBUTE_IN_HISTORICAL_68=The entry %s has historical \
 information for attribute %s which is not defined in the schema. This \
 information will be ignored
  information will be ignored
NOTICE_UNRESOLVED_CONFLICT_69=An unresolved conflict was detected for DN %s
SEVERE_ERR_COULD_NOT_CLOSE_THE_SOCKET_70=The Replication Server socket could not \
 be closed : %s
SEVERE_ERR_COULD_NOT_STOP_LISTEN_THREAD_71=The thread listening on the \
 replication server port could not be stopped : %s
DEBUG_REPLICATION_PORT_IOEXCEPTION_72=An IOException was caught while \
 listening on the replication port
 listening on the replication port
SEVERE_ERR_SEARCHING_GENERATION_ID_73=An unexpected error %s occured when \
searching for generation id for domain : %s
SEVERE_ERR_SEARCHING_DOMAIN_BACKEND_74=An unexpected error occured when \
searching for the backend of the domain : %s
SEVERE_ERR_LOADING_GENERATION_ID_75=An unexpected error occured when \
searching in %s for the generation ID  : %s
SEVERE_ERR_UPDATING_GENERATION_ID_76=An unexpected error %s occured  \
when updating generation ID for the domain : %s
NOTICE_BAD_GENERATION_ID_77=On suffix %s. server %s presented generation ID=%s \
when expected generation ID=%s. Consequently, replication is degraded for that server
NOTICE_RESET_GENERATION_ID_78=The generation ID has been reset for domain %s.\
Replication is now degraded for this domain
MILD_ERR_ERROR_MSG_RECEIVED_79=The following error has been received : <%s>
MILD_ERR_IGNORING_UPDATE_FROM_80=Update <%s> received from server <%s> is \
ignored due to a bad generation ID of this server
MILD_ERR_IGNORING_UPDATE_TO_81=Update <%s> will not be sent to server %s that has \
not the right generation ID
SEVERE_ERR_INIT_IMPORT_NOT_SUPPORTED_82= Initialization cannot be done because \
import is not supported by the backend %s
SEVERE_ERR_INIT_EXPORT_NOT_SUPPORTED_83= Initialization cannot be done because \
export is not supported by the backend %s
SEVERE_ERR_INIT_CANNOT_LOCK_BACKEND_84= Initialization cannot be done because \
the following error occured while locking the backend %s : %s
NOTICE_EXCEPTION_RESTARTING_SESSION_85=Caught Exception during reinitialization of \
 communication on domain %s : %s
SEVERE_ERR_EXCEPTION_LISTENING_86=Replication server caught exception while \
 listening for client connections %s
SEVERE_ERR_ERROR_CLEARING_DB_87=While clearing the database %s , the following \
 error happened: %s
 NOTICE_ERR_ROUTING_TO_SERVER_88=Protocol error : a replication server is not expected \
 to be the destination of a message of type %s
opends/src/server/org/opends/server/backends/SchemaBackend.java
@@ -171,6 +171,10 @@
  // The attribute type that will be used to save the synchronization state.
  private AttributeType synchronizationStateType;
  // The attribute type that will be used to save the synchronization
  // generationId.
  private AttributeType synchronizationGenerationIdType;
  // The value containing DN of the user we'll say created the configuration.
  private AttributeValue creatorsName;
@@ -264,6 +268,9 @@
    nameFormsType = DirectoryServer.getAttributeType(ATTR_NAME_FORMS_LC, true);
    synchronizationStateType =
      DirectoryServer.getAttributeType(ATTR_SYNCHRONIZATION_STATE_LC, true);
    synchronizationGenerationIdType =
      DirectoryServer.getAttributeType(ATTR_SYNCHRONIZATION_GENERATIONID_LC,
          true);
    // Initialize the lastmod attributes.
@@ -912,6 +919,14 @@
    attrList.add(attr);
    operationalAttrs.put(synchronizationStateType, attrList);
    //  Add the synchronization GenerationId attribute.
    valueSet = DirectoryServer.getSchema().getSynchronizationGenerationId();
    attr = new Attribute(synchronizationGenerationIdType,
                         ATTR_SYNCHRONIZATION_GENERATIONID_LC, valueSet);
    attrList = new ArrayList<Attribute>(1);
    attrList.add(attr);
    operationalAttrs.put(synchronizationGenerationIdType, attrList);
    // Add all the user-defined attributes.
    for (Attribute a : userDefinedAttributes)
    {
opends/src/server/org/opends/server/config/ConfigConstants.java
@@ -1335,12 +1335,19 @@
  public static final String ATTR_MATCHING_RULE_USE_LC = "matchingruleuse";
  /**
   * The name of the attribute that holds the sycnhronization state,
   * The name of the attribute that holds the synchronization state,
   * formatted in lowercase.
   */
  public static final String ATTR_SYNCHRONIZATION_STATE_LC = "ds-sync-state";
  /**
   * The name of the attribute that holds the relication generationId,
   * formatted in lowercase.
   */
  public static final String ATTR_SYNCHRONIZATION_GENERATIONID_LC =
       "ds-sync-generation-id";
  /**
   * The default maximum request size that should be used if none is specified
   * in the configuration.
   */
@@ -4248,6 +4255,21 @@
       NAME_PREFIX_TASK + "rebuild-max-threads";
  /**
   * The name of the objectclass that will be used for a Directory Server
   * reset generationId task definition.
   */
  public static final String OC_RESET_GENERATION_ID_TASK =
       NAME_PREFIX_TASK + "reset-generation-id";
  /**
   * The name of the attribute containing the baseDn related to the replication
   * domain to which applies the task.
   */
  public static final String ATTR_TASK_SET_GENERATION_ID_DOMAIN_DN =
    OC_RESET_GENERATION_ID_TASK + "-domain-base-dn";
  /**
   * The name of the attribute in an import task definition that specifies
   * whether the backend should be cleared before the import.
   */
opends/src/server/org/opends/server/core/SchemaConfigManager.java
@@ -720,6 +720,14 @@
            new MatchingRuleUseSyntax());
    }
    AttributeType synchronizationGenerationIdType =
      schema.getAttributeType(ATTR_SYNCHRONIZATION_GENERATIONID_LC);
    if (synchronizationGenerationIdType == null)
    {
      synchronizationGenerationIdType = DirectoryServer.getDefaultAttributeType
          (ATTR_SYNCHRONIZATION_GENERATIONID_LC, new MatchingRuleUseSyntax());
    }
    List<Attribute> synchronizationState =
      entry.getAttribute(synchronizationStateType);
    if (synchronizationState != null && !(synchronizationState.isEmpty()))
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -41,7 +41,7 @@
/**
 * ServerState class.
 * This object is used to store the last update seem on this server
 * This object is used to store the last update seen on this server
 * from each server.
 * It is exchanged with the replication servers at connection establishment
 * time.
@@ -204,7 +204,7 @@
   */
  public ArrayList<ASN1OctetString> toASN1ArrayList()
  {
    ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>();
    ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>(0);
    synchronized (this)
    {
opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java
@@ -28,6 +28,8 @@
package org.opends.server.replication.plugin;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import org.opends.server.loggers.debug.DebugTracer;
import java.io.IOException;
@@ -99,8 +101,9 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("Heartbeat monitor is starting, expected interval is %d",
                heartbeatInterval);
      TRACER.debugInfo("Heartbeat monitor is starting, expected interval is " +
                heartbeatInterval +
          stackTraceToSingleLineString(new Exception()));
    }
    try
    {
@@ -110,9 +113,6 @@
        long lastReceiveTime = session.getLastReceiveTime();
        if (now > lastReceiveTime + 2 * heartbeatInterval)
        {
          TRACER.debugInfo("Heartbeat monitor is closing the broker session " +
          "because it could not detect a heartbeat.");
          // Heartbeat is well overdue so the server is assumed to be dead.
          if (debugEnabled())
          {
@@ -140,7 +140,8 @@
    {
      if (debugEnabled())
      {
        TRACER.debugInfo("Heartbeat monitor is exiting.");
        TRACER.debugInfo("Heartbeat monitor is exiting." +
            stackTraceToSingleLineString(new Exception()));
      }
    }
  }
opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
@@ -313,9 +313,6 @@
  {
    ArrayList<ASN1OctetString> values = this.toASN1ArrayList();
    if (values.size() == 0)
      return ResultCode.SUCCESS;
    LDAPAttribute attr =
      new LDAPAttribute(REPLICATION_STATE, values);
    LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
@@ -343,4 +340,26 @@
    }
    return op.getResultCode();
  }
  /**
   * Empty the ServerState.
   * After this call the Server State will be in the same state
   * as if it was just created.
   */
  public void clearInMemory()
  {
    super.clear();
    this.savedStatus = false;
  }
  /**
   * Empty the ServerState.
   * After this call the Server State will be in the same state
   * as if it was just created.
   */
  public void clear()
  {
    clearInMemory();
    save();
  }
}
opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
@@ -29,7 +29,6 @@
import java.io.IOException;
import java.io.OutputStream;
/**
 * This class creates an output stream that can be used to export entries
 * to a synchonization domain.
@@ -37,7 +36,15 @@
public class ReplLDIFOutputStream
       extends OutputStream
{
  // The synchronization domain on which the export is done
  ReplicationDomain domain;
  // The number of entries to be exported
  long numEntries;
  // The current number of entries exported
  long numExportedEntries;
  String entryBuffer = "";
  /**
@@ -45,10 +52,12 @@
   * domain.
   *
   * @param domain The replication domain
   * @param numEntries The max number of entry to process.
   */
  public ReplLDIFOutputStream(ReplicationDomain domain)
  public ReplLDIFOutputStream(ReplicationDomain domain, long numEntries)
  {
    this.domain = domain;
    this.numEntries = numEntries;
  }
  /**
@@ -75,11 +84,19 @@
      endOfEntryIndex = ebytes.indexOf("\n\n");
      if ( endOfEntryIndex >= 0 )
      {
        endOfEntryIndex += 2;
        entryBuffer = entryBuffer + ebytes.substring(0, endOfEntryIndex);
        // Send the entry
        domain.sendEntryLines(entryBuffer);
        if ((numEntries>0) && (numExportedEntries > numEntries))
        {
          // This outputstream has reached the total number
          // of entries to export.
          return;
        }
        domain.exportLDIFEntry(entryBuffer);
        numExportedEntries++;
        startOfEntryIndex = startOfEntryIndex + endOfEntryIndex;
        entryBuffer = "";
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -25,8 +25,7 @@
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -93,6 +92,7 @@
  private int maxRcvWindow;
  private int timeout = 0;
  private short protocolVersion;
  private long generationId = -1;
  private ReplSessionSecurity replSessionSecurity;
  /**
@@ -143,12 +143,15 @@
   * @param window The size of the send and receive window to use.
   * @param heartbeatInterval The interval between heartbeats requested of the
   * replicationServer, or zero if no heartbeats are requested.
   *
   * @param generationId The generationId for the server associated to the
   * provided serverID and for the domain associated to the provided baseDN.
   * @param replSessionSecurity The session security configuration.
   */
  public ReplicationBroker(ServerState state, DN baseDn, short serverID,
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay, int window, long heartbeatInterval,
      ReplSessionSecurity replSessionSecurity)
      long generationId, ReplSessionSecurity replSessionSecurity)
  {
    this.baseDn = baseDn;
    this.serverID = serverID;
@@ -164,6 +167,7 @@
    this.halfRcvWindow = window/2;
    this.heartbeatInterval = heartbeatInterval;
    this.protocolVersion = ProtocolVersion.currentVersion();
    this.generationId = generationId;
    this.replSessionSecurity = replSessionSecurity;
  }
@@ -198,7 +202,7 @@
   */
  private void connect()
  {
    ReplServerStartMessage startMsg;
    ReplServerStartMessage replServerStartMsg;
    // Stop any existing heartbeat monitor from a previous session.
    if (heartbeatMonitor != null)
@@ -207,8 +211,24 @@
      heartbeatMonitor = null;
    }
    // checkState is true for the first loop on all replication servers
    // looking for one already up-to-date.
    // If we found some responding replication servers but none up-to-date
    // then we set check-state to false and do a second loop where the first
    // found will be the one elected and then we will update this replication
    // server.
    boolean checkState = true;
    boolean receivedResponse = true;
    // TODO: We are doing here 2 loops opening , closing , reopening session to
    // the same servers .. risk to have 'same server id' erros.
    // Would be better to do only one loop, keeping the best candidate while
    // traversing the list of replication servers to connect to.
    if (servers.size()==1)
    {
      checkState = false;
    }
    synchronized (connectPhaseLock)
    {
      while ((!connected) && (!shutdown) && (receivedResponse))
@@ -240,7 +260,7 @@
            ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
                maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
                halfRcvWindow*2, heartbeatInterval, state,
                protocolVersion, isSslEncryption);
                protocolVersion, generationId, isSslEncryption);
            session.publish(msg);
@@ -248,7 +268,7 @@
             * Read the ReplServerStartMessage that should come back.
             */
            session.setSoTimeout(1000);
            startMsg = (ReplServerStartMessage) session.receive();
            replServerStartMsg = (ReplServerStartMessage) session.receive();
            receivedResponse = true;
            /*
@@ -257,7 +277,7 @@
             * if it is an old replication server).
             */
            protocolVersion = ProtocolVersion.minWithCurrent(
                startMsg.getVersion());
                replServerStartMsg.getVersion());
            session.setSoTimeout(timeout);
            if (!isSslEncryption)
@@ -276,7 +296,7 @@
             * those changes and send them again to any replicationServer.
             */
            ChangeNumber replServerMaxChangeNumber =
              startMsg.getServerState().getMaxChangeNumber(serverID);
              replServerStartMsg.getServerState().getMaxChangeNumber(serverID);
            if (replServerMaxChangeNumber == null)
              replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
            ChangeNumber ourMaxChangeNumber =
@@ -285,7 +305,7 @@
                (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
            {
              replicationServer = ServerAddr.toString();
              maxSendWindow = startMsg.getWindowSize();
              maxSendWindow = replServerStartMsg.getWindowSize();
              connected = true;
              startHeartBeat();
              break;
@@ -298,7 +318,8 @@
                 * of our changes, we are going to try another server
                 * but before log a notice message
                 */
                Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server);
                Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server,
                    baseDn.toNormalizedString());
                logError(message);
              }
              else
@@ -341,7 +362,7 @@
                else
                {
                  replicationServer = ServerAddr.toString();
                  maxSendWindow = startMsg.getWindowSize();
                  maxSendWindow = replServerStartMsg.getWindowSize();
                  connected = true;
                  for (FakeOperation replayOp : replayOperations)
                  {
@@ -371,8 +392,9 @@
          }
          catch (Exception e)
          {
            Message message =
                    ERR_EXCEPTION_STARTING_SESSION.get(e.getMessage());
            Message message = NOTE_EXCEPTION_STARTING_SESSION.get(
                baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
                stackTraceToSingleLineString(e));
            logError(message);
          }
          finally
@@ -392,7 +414,9 @@
              }
            }
          }
        }
        } // for servers
        // We have traversed all the replication servers
        if ((!connected) && (checkState == true) && receivedResponse)
        {
@@ -401,12 +425,16 @@
           * changes that this server has already processed, start again
           * the loop looking for any replicationServer.
           */
          Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get();
          Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
              baseDn.toNormalizedString());
          logError(message);
          checkState = false;
        }
      }
      // We have traversed all the replication servers as many times as needed
      // to find one if one is up and running.
      if (connected)
      {
        // This server has connected correctly.
@@ -462,9 +490,9 @@
  /**
   * restart the ReplicationBroker.
   */
  private void reStart()
  public void reStart()
  {
    reStart(null);
    reStart(this.session);
  }
  /**
@@ -472,7 +500,7 @@
   *
   * @param failingSession the socket which failed
   */
  private void reStart(ProtocolSession failingSession)
  public void reStart(ProtocolSession failingSession)
  {
    try
    {
@@ -498,7 +526,8 @@
      } catch (Exception e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_EXCEPTION_STARTING_SESSION.get(e.getMessage()));
        mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
         baseDn.toNormalizedString(), e.getLocalizedMessage()));
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
      }
@@ -533,6 +562,13 @@
        // choice than to return without sending the ReplicationMessage
        // and relying on the resend procedure of the connect phase to
        // fix the problem when we finally connect.
        if (debugEnabled())
        {
          debugInfo("ReplicationBroker.publish() Publishing a " +
              " message is not possible due to existing connection error.");
        }
        return;
      }
@@ -601,12 +637,22 @@
          } catch (InterruptedException e1)
          {
            // ignore
            if (debugEnabled())
            {
              debugInfo("ReplicationBroker.publish() " +
                  "IO exception raised : " + e.getLocalizedMessage());
            }
          }
        }
      }
      catch (InterruptedException e)
      {
        // just loop.
        if (debugEnabled())
        {
          debugInfo("ReplicationBroker.publish() " +
              "Interrupted exception raised." + e.getLocalizedMessage());
        }
      }
    }
  }
@@ -628,7 +674,7 @@
    {
      if (!connected)
      {
        reStart();
        reStart(null);
      }
      ProtocolSession failingSession = session;
@@ -663,6 +709,9 @@
          Message message =
              NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
          logError(message);
          debugInfo("ReplicationBroker.receive() " + baseDn +
              " Exception raised." + e + e.getLocalizedMessage());
          this.reStart(failingSession);
        }
      }
@@ -683,7 +732,8 @@
    {
      if (debugEnabled())
      {
        TRACER.debugInfo("ReplicationBroker Stop Closing session");
        debugInfo("ReplicationBroker is stopping. and will" +
          "close the connection");
      }
      if (session != null)
@@ -713,6 +763,20 @@
  }
  /**
   * Set the value of the generationId for that broker. Normally the
   * generationId is set through the constructor but there are cases
   * where the value of the generationId must be changed while the broker
   * already exist for example after an on-line import.
   *
   * @param generationId The value of the generationId.
   *
   */
  public void setGenerationId(long generationId)
  {
    this.generationId = generationId;
  }
  /**
   * Get the name of the replicationServer to which this broker is currently
   * connected.
   *
@@ -857,6 +921,13 @@
    return !connectionError;
  }
  private boolean debugEnabled() { return true; }
  private static final void debugInfo(String s)
  {
    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
    TRACER.debugInfo(s);
  }
  /**
   * Determine whether the connection to the replication server is encrypted.
   * @return true if the connection is encrypted, false otherwise.
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -27,8 +27,6 @@
package org.opends.server.replication.plugin;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import static org.opends.server.config.ConfigConstants.DN_BACKEND_BASE;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
@@ -38,12 +36,14 @@
import static org.opends.server.replication.protocol.OperationContext.*;
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import org.opends.server.protocols.asn1.ASN1OctetString;
import static org.opends.server.util.ServerConstants.*;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -52,12 +52,14 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CheckedOutputStream;
import java.util.zip.DataFormatException;
import java.util.zip.Adler32;
import java.io.OutputStream;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.meta.MultimasterDomainCfgDefn.*;
import org.opends.server.admin.std.server.MultimasterDomainCfg;
import org.opends.server.admin.std.server.BackendCfg;
import org.opends.server.api.AlertGenerator;
import org.opends.server.api.Backend;
import org.opends.server.api.DirectoryThread;
@@ -77,6 +79,9 @@
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPAttribute;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.protocols.ldap.LDAPModification;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.ServerState;
@@ -89,6 +94,7 @@
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.Control;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.DirectoryException;
@@ -100,6 +106,7 @@
import org.opends.server.types.ModificationType;
import org.opends.server.types.Operation;
import org.opends.server.types.RDN;
import org.opends.server.types.RawModification;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchResultEntry;
@@ -172,6 +179,9 @@
  private int maxSendQueue = 0;
  private int maxReceiveDelay = 0;
  private int maxSendDelay = 0;
  private long generationId = -1;
  private long rejectedGenerationId = -1;
  private boolean requestedResetSinceLastStart = false;
  /**
   * This object is used to store the list of update currently being
@@ -224,7 +234,7 @@
  private int window = 100;
  /**
   * The isoalation policy that this domain is going to use.
   * The isolation policy that this domain is going to use.
   * This field describes the behavior of the domain when an update is
   * attempted and the domain could not connect to any Replication Server.
   * Possible values are accept-updates or deny-updates, but other values
@@ -254,17 +264,20 @@
    // The total entry count expected to be processed
    long entryCount = 0;
    // The count for the entry left to be processed
    // The count for the entry not yet processed
    long entryLeftCount = 0;
    boolean checksumOutput = false;
    // The exception raised when any
    DirectoryException exception = null;
    long checksumOutputValue = (long)0;
    /**
     * Initializes the counters of the task with the provider value.
     * Initializes the import/export counters with the provider value.
     * @param count The value with which to initialize the counters.
     */
    public void initTaskCounters(long count)
    public void initImportExportCounters(long count)
    {
      entryCount = count;
      entryLeftCount = count;
@@ -288,7 +301,7 @@
     * Update the counters of the task for each entry processed during
     * an import or export.
     */
    public void updateTaskCounters()
    public void updateCounters()
    {
      entryLeftCount--;
@@ -342,12 +355,12 @@
    configDn = configuration.dn();
    /*
    * Modify conflicts are solved for all suffixes but the schema suffix
    * because we don't want to store extra information in the schema
    * ldif files.
    * This has no negative impact because the changes on schema should
    * not produce conflicts.
    */
     * Modify conflicts are solved for all suffixes but the schema suffix
     * because we don't want to store extra information in the schema
     * ldif files.
     * This has no negative impact because the changes on schema should
     * not produce conflicts.
     */
    if (baseDN.compareTo(DirectoryServer.getSchemaDN()) == 0)
    {
      solveConflictFlag = false;
@@ -370,27 +383,33 @@
    monitor = new ReplicationMonitor(this);
    DirectoryServer.registerMonitorProvider(monitor);
    backend = retrievesBackend(baseDN);
    if (backend == null)
    {
      throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(
                                  baseDN.toNormalizedString()));
    }
    try
    {
      generationId = loadGenerationId();
    }
    catch (DirectoryException e)
    {
      logError(ERR_LOADING_GENERATION_ID.get(
          baseDN.toNormalizedString(), e.getLocalizedMessage()));
    }
    /*
     * create the broker object used to publish and receive changes
     */
    broker = new ReplicationBroker(state, baseDN, serverId, maxReceiveQueue,
        maxReceiveDelay, maxSendQueue, maxSendDelay, window,
        heartbeatInterval, new ReplSessionSecurity(configuration));
        heartbeatInterval, generationId,
        new ReplSessionSecurity(configuration));
    broker.start(replicationServers);
    // Retrieves the related backend and its config entry
    try
    {
      retrievesBackendInfos(baseDN);
    } catch (DirectoryException e)
    {
      // The backend associated to this suffix is not able to
      // perform export and import.
      // The replication can continue but this replicationDomain
      // won't be able to use total update.
    }
    /*
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
     * for each operation done on the replication domain.
@@ -558,7 +577,7 @@
   * If not set the ResultCode and the response message,
   * interrupt the operation, and return false
   *
   * @param   op  The Operation that needs to be checked.
   * @param   Operation  The Operation that needs to be checked.
   *
   * @return  true when it OK to process the Operation, false otherwise.
   *          When false is returned the resultCode and the reponse message
@@ -798,7 +817,11 @@
              // The server is in the shutdown process
              return null;
            }
            log(Message.raw("Broker received message :" + msg));
            if (debugEnabled())
              if (!(msg instanceof HeartbeatMessage))
                TRACER.debugInfo("Message received <" + msg + ">");
            if (msg instanceof AckMessage)
            {
              AckMessage ack = (AckMessage) msg;
@@ -808,7 +831,7 @@
            {
              // Another server requests us to provide entries
              // for a total update
              initMsg = (InitializeRequestMessage) msg;
              initMsg = (InitializeRequestMessage)msg;
            }
            else if (msg instanceof InitializeTargetMessage)
            {
@@ -822,20 +845,19 @@
                // bunch of entries from the remote server and we
                // want the import thread to catch them and
                // not the ListenerThread.
                importBackend(importMsg);
                initialize(importMsg);
              }
              catch(DirectoryException de)
              {
                // Returns an error message to notify the sender
                ErrorMessage errorMsg =
                  new ErrorMessage(importMsg.getsenderID(),
                                   de.getMessageObject());
                MessageBuilder mb = new MessageBuilder();
                mb.append(de.getMessageObject());
                mb.append("Backend ID: ");
                mb.append(backend.getBackendID());
                log(mb.toMessage());
                // Return an error message to notify the sender
                ErrorMessage errorMsg =
                  new ErrorMessage(importMsg.getsenderID(),
                                   de.getMessageObject());
                TRACER.debugInfo(Message.toString(mb.toMessage()));
                broker.publish(errorMsg);
              }
            }
@@ -850,6 +872,39 @@
                //  replicationServer did not find any import source.
                abandonImportExport((ErrorMessage)msg);
              }
              else
              {
                /* We can receive an error message from the replication server
                 * in the following cases :
                 * - we connected with an incorrect generation id
                 */
                ErrorMessage errorMsg = (ErrorMessage)msg;
                logError(ERR_ERROR_MSG_RECEIVED.get(
                           errorMsg.getDetails()));
                if (errorMsg.getMsgID() == NOTE_RESET_GENERATION_ID.getId())
                {
                  TRACER.debugInfo("requestedResetSinceLastStart=" +
                             requestedResetSinceLastStart +
                            "rejectedGenerationId=" + rejectedGenerationId);
                  if (requestedResetSinceLastStart && (rejectedGenerationId>0))
                  {
                    // When the last generation presented was refused and we are
                    // the 'reseter' server then restart automatically to become
                    // the 'master'
                    state.clear();
                    rejectedGenerationId = -1;
                    requestedResetSinceLastStart = false;
                    broker.stop();
                    broker.start(replicationServers);
                  }
                }
                if (errorMsg.getMsgID() == NOTE_BAD_GENERATION_ID.getId())
                {
                  rejectedGenerationId = generationId;
                }
              }
            }
            else if (msg instanceof UpdateMessage)
            {
@@ -875,7 +930,7 @@
        {
          try
          {
            initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
            initializeRemote(initMsg.getsenderID(), initMsg.getsenderID(),
                null);
          }
          catch(DirectoryException de)
@@ -2100,7 +2155,7 @@
  public void disable()
  {
    state.save();
    state.clear();
    state.clearInMemory();
    disabled = true;
    //  stop the listener threads
    for (ListenerThread thread : synchroThreads)
@@ -2126,20 +2181,252 @@
   * The domain will connect back to a replication Server and
   * will recreate threads to listen for messages from the Sycnhronization
   * server.
   * The generationId will be retrieved or computed if necessary.
   * The ServerState will also be read again from the local database.
   */
  public void enable()
  {
    state.clear();
    state.clearInMemory();
    state.loadState();
    disabled = false;
    try
    {
      generationId = loadGenerationId();
    }
    catch (Exception e)
    {
      /* TODO should mark that replicationServer service is
       * not available, log an error and retry upon timeout
       * should we stop the modifications ?
       */
      logError(ERR_LOADING_GENERATION_ID.get(
          baseDN.toNormalizedString(), e.getLocalizedMessage()));
      return;
    }
    // After an on-line import, the value of the generationId is new
    // and it is necessary for the broker to send this new value as part
    // of the serverStart message.
    broker.setGenerationId(generationId);
    broker.start(replicationServers);
    createListeners();
  }
  /**
   * Compute the data generationId associated with the current data present
   * in the backend for this domain.
   * @return The computed generationId.
   * @throws DirectoryException When an error occurs.
   */
  public long computeGenerationId() throws DirectoryException
  {
    long bec = backend.getEntryCount();
    if (bec<0)
      backend = this.retrievesBackend(baseDN);
    bec = backend.getEntryCount();
    this.acquireIEContext();
    ieContext.checksumOutput = true;
    ieContext.entryCount = (bec<1000?bec:1000);
    ieContext.entryLeftCount = ieContext.entryCount;
    exportBackend();
    long genId = ieContext.checksumOutputValue;
    if (debugEnabled())
      TRACER.debugInfo("Computed generationId: #entries=" + bec +
               " generationId=" + ieContext.checksumOutputValue);
    ieContext.checksumOutput = false;
    this.releaseIEContext();
    return genId;
  }
  /**
   * Returns the generationId set for this domain.
   *
   * @return The generationId.
   */
  public long getGenerationId()
  {
    return generationId;
  }
  /**
   * The attribute name used to store the state in the backend.
   */
  protected static final String REPLICATION_GENERATION_ID =
    "ds-sync-generation-id";
  /**
   * Stores the value of the generationId.
   * @param generationId The value of the generationId.
   * @return a ResultCode indicating if the method was successfull.
   */
  public ResultCode saveGenerationId(long generationId)
  {
    // The generationId is stored in the root entry of the domain.
    ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
    ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>();
    ASN1OctetString value = new ASN1OctetString(Long.toString(generationId));
    values.add(value);
    LDAPAttribute attr =
      new LDAPAttribute(REPLICATION_GENERATION_ID, values);
    LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
    ArrayList<RawModification> mods = new ArrayList<RawModification>(1);
    mods.add(mod);
    ModifyOperationBasis op =
      new ModifyOperationBasis(conn, InternalClientConnection.nextOperationID(),
          InternalClientConnection.nextMessageID(),
          new ArrayList<Control>(0), asn1BaseDn,
          mods);
    op.setInternalOperation(true);
    op.setSynchronizationOperation(true);
    op.setDontSynchronize(true);
    op.run();
    ResultCode result = op.getResultCode();
    if (result != ResultCode.SUCCESS)
    {
      Message message = ERR_UPDATING_GENERATION_ID.get(
                          op.getResultCode().getResultCodeName() + " " +
                          op.getErrorMessage(),
                          baseDN.toString());
      logError(message);
    }
    return result;
  }
  /**
   * Load the GenerationId from the root entry of the domain
   * from the REPLICATION_GENERATION_ID attribute in database
   * to memory, or compute it if not found.
   *
   * @return generationId The retrieved value of generationId
   * @throws DirectoryException When an error occurs.
   */
  public long loadGenerationId()
  throws DirectoryException
  {
    long generationId=-1;
    if (debugEnabled())
      TRACER.debugInfo(
          "Attempt to read generation ID from DB " + baseDN.toString());
    ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
    boolean found = false;
    LDAPFilter filter;
    try
    {
      filter = LDAPFilter.decode("objectclass=*");
    }
    catch (LDAPException e)
    {
      // can not happen
      return -1;
    }
    /*
     * Search the database entry that is used to periodically
     * save the ServerState
     */
    InternalSearchOperation search = null;
    LinkedHashSet<String> attributes = new LinkedHashSet<String>(1);
    attributes.add(REPLICATION_GENERATION_ID);
    search = conn.processSearch(asn1BaseDn,
        SearchScope.BASE_OBJECT,
        DereferencePolicy.DEREF_ALWAYS, 0, 0, false,
        filter,attributes);
    if (((search.getResultCode() != ResultCode.SUCCESS)) &&
        ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT)))
    {
      Message message = ERR_SEARCHING_GENERATION_ID.get(
          search.getResultCode().getResultCodeName() + " " +
          search.getErrorMessage(),
          baseDN.toString());
      logError(message);
    }
    SearchResultEntry resultEntry = null;
    if (search.getResultCode() == ResultCode.SUCCESS)
    {
      LinkedList<SearchResultEntry> result = search.getSearchEntries();
      resultEntry = result.getFirst();
      if (resultEntry != null)
      {
        AttributeType synchronizationGenIDType =
          DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID);
        List<Attribute> attrs =
          resultEntry.getAttribute(synchronizationGenIDType);
        if (attrs != null)
        {
          Attribute attr = attrs.get(0);
          LinkedHashSet<AttributeValue> values = attr.getValues();
          if (values.size()!=1)
          {
            Message message = ERR_LOADING_GENERATION_ID.get(
                baseDN.toString(), "#Values != 1");
            logError(message);
          }
          else
          {
            found=true;
            try
            {
              generationId = Long.decode(values.iterator().next().
                  getStringValue());
            }
            catch(Exception e)
            {
              Message message = ERR_LOADING_GENERATION_ID.get(
                baseDN.toString(), e.getLocalizedMessage());
              logError(message);
            }
          }
        }
      }
    }
    if (!found)
    {
      generationId = computeGenerationId();
      saveGenerationId(generationId);
      if (debugEnabled())
        TRACER.debugInfo("Generation ID created for domain base DN=" +
            baseDN.toString() +
            " generationId=" + generationId);
    }
    else
    {
      if (debugEnabled())
        TRACER.debugInfo(
            "Generation ID successfully read from domain base DN=" + baseDN +
            " generationId=" + generationId);
    }
    return generationId;
  }
  /**
   * Reset the generationId of this domain in the whole topology.
   * A message is sent to the Replication Servers for them to reset
   * their change dbs.
   */
  public void resetGenerationId()
  {
    requestedResetSinceLastStart = true;
    ResetGenerationId genIdMessage = new ResetGenerationId();
    broker.publish(genIdMessage);
  }
  /**
   * Do whatever is needed when a backup is started.
   * We need to make sure that the serverState is correclty save.
   */
@@ -2175,18 +2462,20 @@
      {
        msg = broker.receive();
        if (debugEnabled())
          TRACER.debugInfo("Import: EntryBytes received " + msg);
        if (msg == null)
        {
          // The server is in the shutdown process
          return null;
        }
        log(Message.raw("receiveEntryBytes: received " + msg));
        if (msg instanceof EntryMessage)
        {
          // FIXME
          EntryMessage entryMsg = (EntryMessage)msg;
          byte[] entryBytes = entryMsg.getEntryBytes().clone();
          ieContext.updateTaskCounters();
          ieContext.updateCounters();
          return entryBytes;
        }
        else if (msg instanceof DoneMessage)
@@ -2202,8 +2491,9 @@
          // The error is stored and the import is ended
          // by returning null
          ErrorMessage errorMsg = (ErrorMessage)msg;
          ieContext.exception = new DirectoryException(ResultCode.OTHER,
              errorMsg.getDetails());
          ieContext.exception = new DirectoryException(
                                      ResultCode.OTHER,
                                      errorMsg.getDetails());
          return null;
        }
        else
@@ -2213,9 +2503,10 @@
      }
      catch(Exception e)
      {
        // TODO: i18n
        ieContext.exception = new DirectoryException(ResultCode.OTHER,
            Message.raw("received an unexpected message type"), e);
        return null;
            Message.raw("received an unexpected message type" +
                e.getLocalizedMessage()));
      }
    }
  }
@@ -2299,26 +2590,17 @@
  }
  /**
   * Log debug message.
   * @param message The message to log.
   * Export the entries from the backend.
   * The ieContext must have been set before calling.
   *
   * @throws DirectoryException when an error occured
   */
  private void log(Message message)
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("DebugInfo" + message);
    }
  }
  /**
   * Export the entries.
   * @throws DirectoryException when an error occurred
   */
  protected void exportBackend() throws DirectoryException
  protected void exportBackend()
  throws DirectoryException
  {
    // FIXME Temporary workaround - will probably be fixed when implementing
    // dynamic config
    retrievesBackendInfos(this.baseDN);
    backend = retrievesBackend(this.baseDN);
    //  Acquire a shared lock for the backend.
    try
@@ -2344,13 +2626,53 @@
          ResultCode.OTHER, message, null);
    }
    ReplLDIFOutputStream os = new ReplLDIFOutputStream(this);
    OutputStream os;
    ReplLDIFOutputStream ros;
    if (ieContext.checksumOutput)
    {
      ros = new ReplLDIFOutputStream(this, ieContext.entryCount);
      os = new CheckedOutputStream(ros, new Adler32());
      try
      {
        os.write((Long.toString(ieContext.entryCount)).getBytes());
      }
      catch(Exception e)
      {
        // Should never happen
      }
    }
    else
    {
      ros = new ReplLDIFOutputStream(this, (short)-1);
      os = ros;
    }
    LDIFExportConfig exportConfig = new LDIFExportConfig(os);
    // baseDN branch is the only one included in the export
    List<DN> includeBranches = new ArrayList<DN>(1);
    includeBranches.add(this.baseDN);
    exportConfig.setIncludeBranches(includeBranches);
    // For the checksum computing mode, only consider the 'stable' attributes
    if (ieContext.checksumOutput)
    {
      String includeAttributeStrings[] =
        {"objectclass", "sn", "cn", "entryuuid"};
      HashSet<AttributeType> includeAttributes;
      includeAttributes = new HashSet<AttributeType>();
      for (String attrName : includeAttributeStrings)
      {
        AttributeType attrType  = DirectoryServer.getAttributeType(attrName);
        if (attrType == null)
        {
          attrType = DirectoryServer.getDefaultAttributeType(attrName);
        }
        includeAttributes.add(attrType);
      }
      exportConfig.setIncludeAttributes(includeAttributes);
    }
    //  Launch the export.
    try
    {
@@ -2374,8 +2696,19 @@
    }
    finally
    {
      //  Clean up after the export by closing the export config.
      exportConfig.close();
      if ((ieContext != null) && (ieContext.checksumOutput))
      {
        ieContext.checksumOutputValue =
         ((CheckedOutputStream)os).getChecksum().getValue();
      }
      else
      {
        // Clean up after the export by closing the export config.
        // Will also flush the export and export the remaining entries.
        // This is a real export where writer has been initialized.
        exportConfig.close();
      }
      //  Release the shared lock on the backend.
      try
@@ -2403,54 +2736,26 @@
  }
  /**
   * Retrieves the backend object related to the domain and the backend's
   * config entry. They will be used for import and export.
   * TODO This should be in a shared package rather than here.
   * Retrieves the backend related to the domain.
   *
   * @return The backend of that domain.
   * @param baseDN The baseDN to retrieve the backend
   * @throws DirectoryException when an error occired
   */
  protected void retrievesBackendInfos(DN baseDN) throws DirectoryException
  protected Backend retrievesBackend(DN baseDN)
  {
    // Retrieves the backend related to this domain
    Backend domainBackend = DirectoryServer.getBackend(baseDN);
    if (domainBackend == null)
    {
      Message message = ERR_CANNOT_DECODE_BASE_DN.get(DN_BACKEND_BASE, "");
      throw new DirectoryException(
          ResultCode.OTHER, message, null);
    }
    // Retrieves its configuration
    BackendCfg backendCfg = TaskUtils.getConfigEntry(domainBackend);
    if (backendCfg == null)
    {
      Message message =
          ERR_LDIFIMPORT_NO_BACKENDS_FOR_ID.get();
      logError(message);
      throw new DirectoryException(
          ResultCode.OTHER, message, null);
    }
    this.backend = domainBackend;
    if (! domainBackend.supportsLDIFImport())
    {
      Message message = ERR_LDIFIMPORT_CANNOT_IMPORT.get(
              String.valueOf(baseDN));
      logError(message);
      throw new DirectoryException(
          ResultCode.OTHER, message, null);
    }
    return DirectoryServer.getBackend(baseDN);
  }
  /**
   * Sends lDIFEntry entry lines to the export target currently set.
   * Exports an entry in LDIF format.
   *
   * @param lDIFEntry The lines for the LDIF entry.
   * @param  lDIFEntry The entry to be exported..
   *
   * @throws IOException when an error occurred.
   */
  public void sendEntryLines(String lDIFEntry) throws IOException
  public void exportLDIFEntry(String lDIFEntry) throws IOException
  {
    // If an error was raised - like receiving an ErrorMessage
    // we just let down the export.
@@ -2461,12 +2766,14 @@
      throw ioe;
    }
    // new entry then send the current one
    EntryMessage entryMessage = new EntryMessage(
    if (ieContext.checksumOutput == false)
    {
      // Actually send the entry
      EntryMessage entryMessage = new EntryMessage(
        serverId, ieContext.exportTarget, lDIFEntry.getBytes());
    broker.publish(entryMessage);
    ieContext.updateTaskCounters();
      broker.publish(entryMessage);
    }
    ieContext.updateCounters();
  }
  /**
@@ -2477,9 +2784,11 @@
   *                 and should be updated of its progress.
   * @throws DirectoryException when an error occurs
   */
  public void initialize(short source, Task initTask)
  public void initializeFromRemote(short source, Task initTask)
  throws DirectoryException
  {
    // TRACER.debugInfo("Entering initializeFromRemote");
    acquireIEContext();
    ieContext.initializeTask = initTask;
@@ -2495,13 +2804,14 @@
  /**
   * Verifies that the given string represents a valid source
   * from which this server can be initialized.
   * @param sourceString The string representaing the source
   * @param sourceString The string representing the source
   * @return The source as a short value
   * @throws DirectoryException if the string is not valid
   */
  public short decodeSource(String sourceString)
  throws DirectoryException
  {
    TRACER.debugInfo("Entering decodeSource");
    short  source = 0;
    Throwable cause = null;
    try
@@ -2512,8 +2822,6 @@
        // TODO Verifies serverID is in the domain
        // We shold check here that this is a server implied
        // in the current domain.
        log(Message.raw("Source decoded for import:" + source));
        return source;
      }
    }
@@ -2525,11 +2833,15 @@
    ResultCode resultCode = ResultCode.OTHER;
    Message message = ERR_INVALID_IMPORT_SOURCE.get();
    if (cause != null)
    {
      throw new DirectoryException(
          resultCode, message, cause);
    }
    else
    {
      throw new DirectoryException(
          resultCode, message);
    }
  }
  /**
@@ -2600,57 +2912,65 @@
   * @param target The target that should be initialized
   * @param initTask The task that triggers this initialization and that should
   *                 be updated with its progress.
   *
   * @exception DirectoryException When an error occurs.
   */
  public void initializeTarget(short target, Task initTask)
  public void initializeRemote(short target, Task initTask)
  throws DirectoryException
  {
    initializeTarget(target, serverId, initTask);
    initializeRemote(target, serverId, initTask);
  }
  /**
   * Process the initialization of some other server or servers in the topology
   * specified by the target argument when this initialization has been
   * initiated by another server than this one.
   * specified by the target argument when this initialization specifying the
   * server that requests the initialization.
   *
   * @param target The target that should be initialized.
   * @param requestorID The server that initiated the export.
   * @param initTask The task that triggers this initialization and that should
   *  be updated with its progress.
   *
   * @exception DirectoryException When an error occurs.
   */
  public void initializeTarget(short target, short requestorID, Task initTask)
  public void initializeRemote(short target, short requestorID, Task initTask)
  throws DirectoryException
  {
    // FIXME Temporary workaround - will probably be fixed when implementing
    // dynamic config
    retrievesBackendInfos(this.baseDN);
    acquireIEContext();
    ieContext.exportTarget = target;
    if (initTask != null)
    {
      ieContext.initializeTask = initTask;
      ieContext.initTaskCounters(backend.getEntryCount());
    }
    // Send start message to the peer
    InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
        baseDN, serverId, ieContext.exportTarget, requestorID,
        backend.getEntryCount());
    log(Message.raw("SD : publishes " + initializeMessage +
        " for #entries=" + backend.getEntryCount() + ieContext.entryLeftCount));
    broker.publish(initializeMessage);
    try
    {
      // FIXME Temporary workaround - will probably be fixed when implementing
      // dynamic config
      backend = retrievesBackend(this.baseDN);
      if (!backend.supportsLDIFExport())
      {
        Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get(
                            backend.getBackendID().toString());
        logError(message);
        throw new DirectoryException(ResultCode.OTHER, message);
      }
      acquireIEContext();
      ieContext.exportTarget = target;
      if (initTask != null)
      {
        ieContext.initializeTask = initTask;
        ieContext.initImportExportCounters(backend.getEntryCount());
      }
      // Send start message to the peer
      InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
          baseDN, serverId, ieContext.exportTarget, requestorID,
          backend.getEntryCount());
      broker.publish(initializeMessage);
      exportBackend();
      // Notify the peer of the success
      DoneMessage doneMsg = new DoneMessage(serverId,
        initializeMessage.getDestination());
          initializeMessage.getDestination());
      broker.publish(doneMsg);
      releaseIEContext();
@@ -2658,7 +2978,9 @@
    catch(DirectoryException de)
    {
      // Notify the peer of the failure
      ErrorMessage errorMsg = new ErrorMessage(target, de.getMessageObject());
      ErrorMessage errorMsg =
        new ErrorMessage(target,
                         de.getMessageObject());
      broker.publish(errorMsg);
      releaseIEContext();
@@ -2686,8 +3008,9 @@
    StringBuilder failureReason = new StringBuilder();
    if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
    {
      Message message = ERR_LDIFIMPORT_CANNOT_LOCK_BACKEND.get(
          backend.getBackendID(), String.valueOf(failureReason));
      Message message = ERR_INIT_CANNOT_LOCK_BACKEND.get(
                          backend.getBackendID(),
                          String.valueOf(failureReason));
      logError(message);
      throw new DirectoryException(ResultCode.OTHER, message);
    }
@@ -2698,14 +3021,22 @@
   * @param initializeMessage The message that initiated the import.
   * @exception DirectoryException Thrown when an error occurs.
   */
  protected void importBackend(InitializeTargetMessage initializeMessage)
  protected void initialize(InitializeTargetMessage initializeMessage)
  throws DirectoryException
  {
    LDIFImportConfig importConfig = null;
    DirectoryException de = null;
    if (!backend.supportsLDIFImport())
    {
      Message message = ERR_INIT_IMPORT_NOT_SUPPORTED.get(
                          backend.getBackendID().toString());
      logError(message);
      throw new DirectoryException(ResultCode.OTHER, message);
    }
    try
    {
      log(Message.raw("startImport"));
      if (initializeMessage.getRequestorID() == serverId)
      {
        // The import responds to a request we did so the IEContext
@@ -2718,7 +3049,7 @@
      ieContext.importSource = initializeMessage.getsenderID();
      ieContext.entryLeftCount = initializeMessage.getEntryCount();
      ieContext.initTaskCounters(initializeMessage.getEntryCount());
      ieContext.initImportExportCounters(initializeMessage.getEntryCount());
      preBackendImport(this.backend);
@@ -2737,20 +3068,14 @@
      // Process import
      this.backend.importLDIF(importConfig);
      TRACER.debugInfo("The import has ended successfully.");
      stateSavingDisabled = false;
      // Re-exchange state with SS
      broker.stop();
      broker.start(replicationServers);
    }
    catch(Exception e)
    {
      DirectoryException de =
        new DirectoryException(
            ResultCode.OTHER, Message.raw(e.getLocalizedMessage()));
      ieContext.exception = de;
      throw (de);
      de = new DirectoryException(ResultCode.OTHER,
                                  Message.raw(e.getLocalizedMessage()));
    }
    finally
    {
@@ -2766,12 +3091,33 @@
        ((InitializeTask)ieContext.initializeTask).
        setState(ieContext.updateTaskCompletionState(),ieContext.exception);
      }
      releaseIEContext();
      log(Message.raw("End importBackend"));
      // Retrieves the generation ID associated with the data imported
      try
      {
        generationId = loadGenerationId();
      }
      catch (DirectoryException e)
      {
        logError(ERR_LOADING_GENERATION_ID.get(
            baseDN.toNormalizedString(),
            e.getLocalizedMessage()));
      }
      rejectedGenerationId = -1;
      if (debugEnabled())
        TRACER.debugInfo(
            "After import, the replication plugin restarts connections" +
            " to all RSs to provide new generation ID=" + generationId);
      broker.setGenerationId(generationId);
      // Re-exchange generationID and state with RS
      broker.reStart();
    }
    // Success
    // Sends up the root error.
    if (de != null)
      throw de;
  }
  /**
@@ -2794,7 +3140,6 @@
      throw new DirectoryException(ResultCode.OTHER, message);
    }
    // FIXME setBackendEnabled should be part taskUtils ?
    TaskUtils.enableBackend(backend.getBackendID());
  }
@@ -2988,6 +3333,16 @@
  }
  /**
   * Check if the domain is connected to a ReplicationServer.
   *
   * @return true if the server is connected, false if not.
   */
  public boolean isConnected()
  {
    return broker.isConnected();
  }
  /**
   * Determine whether the connection to the replication server is encrypted.
   * @return true if the connection is encrypted, false otherwise.
   */
opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java
@@ -151,7 +151,10 @@
    attributes.add(attr);
    attributes.add(new Attribute("ssl-encryption",
                                 String.valueOf(domain.isSessionEncrypted())));
        String.valueOf(domain.isSessionEncrypted())));
    attributes.add(new Attribute("generation-id",
        String.valueOf(domain.getGenerationId())));
    return attributes;
opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java
@@ -27,10 +27,15 @@
package org.opends.server.replication.protocol;
import org.opends.messages.Message;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.loggers.debug.DebugTracer;
/**
 * This message is part of the replication protocol.
 * This message is sent by a server or a replication server when an error
@@ -41,6 +46,9 @@
{
  private static final long serialVersionUID = 2726389860247088266L;
  // The tracer object for the debug logger
  private static final DebugTracer TRACER = getTracer();
  // Specifies the messageID built form the error that was detected
  private int msgID;
@@ -48,10 +56,11 @@
  private Message details = null;
  /**
   * Create a InitializeMessage.
   * Creates an ErrorMessage providing the destination server.
   *
   * @param sender The server ID of the server that send this message.
   * @param destination The destination server or servers of this message.
   * @param details The details of the error.
   * @param details The message containing the details of the error.
   */
  public ErrorMessage(short sender, short destination,
                      Message details)
@@ -59,10 +68,13 @@
    super(sender, destination);
    this.msgID  = details.getDescriptor().getId();
    this.details = details;
    if (debugEnabled())
      TRACER.debugInfo(" Creating error message" + this.toString());
  }
  /**
   * Create a InitializeMessage.
   * Creates an ErrorMessage.
   *
   * @param destination replication server id
   * @param details details of the error
@@ -72,13 +84,17 @@
    super((short)-2, destination);
    this.msgID  = details.getDescriptor().getId();
    this.details = details;
    if (debugEnabled())
      TRACER.debugInfo(this.toString());
  }
  /**
   * Creates a new InitializeMessage by decoding the provided byte array.
   * @param in A byte array containing the encoded information for the Message
   * Creates a new ErrorMessage by decoding the provided byte array.
   *
   * @param  in A byte array containing the encoded information for the Message
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded InitializeMessage.
   *                             encoded message.
   */
  public ErrorMessage(byte[] in) throws DataFormatException
  {
@@ -185,4 +201,18 @@
      return null;
    }
  }
  /**
   * Returns a string representation of the message.
   *
   * @return the string representation of this message.
   */
  public String toString()
  {
    return "ErrorMessage=["+
      " sender=" + this.senderID +
      " destination=" + this.destination +
      " msgID=" + this.msgID +
      " details=" + this.details + "]";
  }
}
opends/src/server/org/opends/server/replication/protocol/InitializeRequestMessage.java
@@ -155,4 +155,14 @@
      return null;
    }
  }
  /**
   * Get a string representation of this object.
   * @return A string representation of this object.
   */
  public String toString()
  {
    return "InitializeRequestMessage: baseDn="+baseDn+" senderId="+senderID +
    " destination=" + destination;
  }
}
opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
@@ -51,6 +51,7 @@
public class ReplServerInfoMessage extends ReplicationMessage
{
  private List<String> connectedServers = null;
  private long generationId;
  /**
   * Creates a new changelogInfo message from its encoded form.
@@ -68,15 +69,23 @@
        throw new DataFormatException(
        "Input is not a valid changelogInfo Message.");
      connectedServers = new ArrayList<String>();
      int pos = 1;
      /* read the generationId */
      int length = getNextLength(in, pos);
      generationId = Long.valueOf(new String(in, pos, length,
          "UTF-8"));
      pos += length +1;
      /* read the connected servers */
      connectedServers = new ArrayList<String>();
      while (pos < in.length)
      {
        /*
         * Read the next server ID
         * first calculate the length then construct the string
         */
        int length = getNextLength(in, pos);
        length = getNextLength(in, pos);
        connectedServers.add(new String(in, pos, length, "UTF-8"));
        pos += length +1;
      }
@@ -92,10 +101,14 @@
   * connected servers.
   *
   * @param connectedServers The list of currently connected servers ID.
   * @param generationId     The generationId currently associated with this
   *                         domain.
   */
  public ReplServerInfoMessage(List<String> connectedServers)
  public ReplServerInfoMessage(List<String> connectedServers,
      long generationId)
  {
    this.connectedServers = connectedServers;
    this.generationId = generationId;
  }
  /**
@@ -110,6 +123,12 @@
      /* Put the message type */
      oStream.write(MSG_TYPE_REPL_SERVER_INFO);
      // Put the generationId
      oStream.write(String.valueOf(generationId).getBytes("UTF-8"));
      oStream.write(0);
      // Put the servers
      if (connectedServers.size() >= 1)
      {
        for (String server : connectedServers)
@@ -119,6 +138,7 @@
          oStream.write(0);
        }
      }
      return oStream.toByteArray();
    }
    catch (IOException e)
@@ -139,4 +159,29 @@
  {
    return connectedServers;
  }
  /**
   * Get the generationId from this message.
   * @return The generationId.
   */
  public long getGenerationId()
  {
    return generationId;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String toString()
  {
    String csrvs = "";
    for (String s : connectedServers)
    {
      csrvs += s + "/";
    }
    return ("ReplServerInfoMessage: genId=" + getGenerationId() +
            " Connected peers:" + csrvs);
  }
}
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java
@@ -65,6 +65,7 @@
   * @param windowSize The window size.
   * @param serverState our ServerState for this baseDn.
   * @param protocolVersion The replication protocol version of the creator.
   * @param generationId The generationId for this server.
   * @param sslEncryption Whether to continue using SSL to encrypt messages
   *                      after the start messages have been exchanged.
   */
@@ -72,9 +73,10 @@
                               int windowSize,
                               ServerState serverState,
                               short protocolVersion,
                               long generationId,
                               boolean sslEncryption)
  {
    super(protocolVersion);
    super(protocolVersion, generationId);
    this.serverId = serverId;
    this.serverURL = serverURL;
    if (baseDn != null)
opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -54,6 +54,8 @@
  static final byte MSG_TYPE_ERROR = 14;
  static final byte MSG_TYPE_WINDOW_PROBE = 15;
  static final byte MSG_TYPE_REPL_SERVER_INFO = 16;
  static final byte MSG_TYPE_RESET_GENERATION_ID = 17;
  // Adding a new type of message here probably requires to
  // change accordingly generateMsg method below
@@ -76,6 +78,7 @@
   * MSG_TYPE_ERROR
   * MSG_TYPE_WINDOW_PROBE
   * MSG_TYPE_REPL_SERVER_INFO
   * MSG_TYPE_RESET_GENERATION_ID
   *
   * @return the byte[] representation of this message.
   * @throws UnsupportedEncodingException  When the encoding of the message
@@ -140,6 +143,9 @@
      case MSG_TYPE_ERROR:
        msg = new ErrorMessage(buffer);
      break;
      case MSG_TYPE_RESET_GENERATION_ID:
        msg = new ResetGenerationId(buffer);
      break;
      case MSG_TYPE_WINDOW_PROBE:
        msg = new WindowProbe(buffer);
      break;
opends/src/server/org/opends/server/replication/protocol/ResetGenerationId.java
New file
@@ -0,0 +1,78 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
import java.io.Serializable;
import java.util.zip.DataFormatException;
/**
 * This message is used by an LDAP server to communicate to the topology
 * that the generation must be reset for the domain.
 */
public class ResetGenerationId extends ReplicationMessage implements
    Serializable
{
  private static final long serialVersionUID = 7657049716115572226L;
  /**
   * Creates a new message.
   */
  public ResetGenerationId()
  {
  }
  /**
   * Creates a new GenerationIdMessage from its encoded form.
   *
   * @param in The byte array containing the encoded form of the
   *           WindowMessage.
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the WindowMessage.
   */
  public ResetGenerationId(byte[] in) throws DataFormatException
  {
    if (in[0] != MSG_TYPE_RESET_GENERATION_ID)
      throw new
      DataFormatException("input is not a valid GenerationId Message");
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    int length = 1;
    byte[] resultByteArray = new byte[length];
    /* put the type of the operation */
    resultByteArray[0] = MSG_TYPE_RESET_GENERATION_ID;
    return resultByteArray;
  }
}
opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
@@ -69,7 +69,9 @@
  private boolean sslEncryption;
  /**
   * Create a new ServerStartMessage.
   * Creates a new ServerStartMessage. This message is to be sent by an LDAP
   * Server after being connected to a replication server for a given
   * replication domain.
   *
   * @param serverId The serverId of the server for which the ServerStartMessage
   *                 is created.
@@ -82,6 +84,7 @@
   * @param heartbeatInterval The requested heartbeat interval.
   * @param serverState  The state of this server.
   * @param protocolVersion The replication protocol version of the creator.
   * @param generationId The generationId for this server.
   * @param sslEncryption Whether to continue using SSL to encrypt messages
   *                      after the start messages have been exchanged.
   */
@@ -91,9 +94,10 @@
                            long heartbeatInterval,
                            ServerState serverState,
                            short protocolVersion,
                            long generationId,
                            boolean sslEncryption)
  {
    super(protocolVersion);
    super(protocolVersion, generationId);
    this.serverId = serverId;
    this.baseDn = baseDn.toString();
@@ -128,10 +132,6 @@
  {
    super(MSG_TYPE_SERVER_START, in);
    /* The ServerStartMessage is encoded in the form :
     * <header><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
     * <maxSendDelay><maxSendQueue><window><heartbeatInterval><ServerState>
     */
    try
    {
      /* first bytes are the header */
@@ -303,11 +303,6 @@
  @Override
  public byte[] getBytes()
  {
    /*
     * ServerStartMessage contains.
     * <baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
     * <maxSendDelay><maxSendQueue><windowsize><heartbeatInterval><ServerState>
     */
    try {
      byte[] byteDn = baseDn.getBytes("UTF-8");
      byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -37,6 +37,7 @@
import java.util.zip.DataFormatException;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
/**
 * This class Implement a protocol session using a basic socket and relying on
@@ -91,7 +92,8 @@
  {
    if (debugEnabled())
    {
      TRACER.debugVerbose("Closing SocketSession.");
      TRACER.debugInfo("Closing SocketSession."
          + stackTraceToSingleLineString(new Exception()));
    }
    socket.close();
  }
@@ -104,6 +106,12 @@
  {
    byte[] buffer = msg.getBytes();
    String str = String.format("%08x", buffer.length);
    if (debugEnabled())
    {
      TRACER.debugInfo("SocketSession publish <" + str + ">");
    }
    byte[] sendLengthBuf = str.getBytes();
    output.write(sendLengthBuf);
opends/src/server/org/opends/server/replication/protocol/StartMessage.java
@@ -39,6 +39,7 @@
public abstract class StartMessage extends ReplicationMessage
{
  private short protocolVersion;
  private long  generationId;
  /**
   * The length of the header of this message.
@@ -49,11 +50,14 @@
   * Create a new StartMessage.
   *
   * @param protocolVersion The Replication Protocol version of the server
   * for which the StartMessage is created.
   *                        for which the StartMessage is created.
   * @param generationId    The generationId for this server.
   *
   */
  public StartMessage(short protocolVersion)
  public StartMessage(short protocolVersion, long generationId)
  {
    this.protocolVersion = protocolVersion;
    this.generationId = generationId;
  }
  /**
@@ -86,11 +90,14 @@
  throws UnsupportedEncodingException
  {
    byte[] versionByte = Short.toString(protocolVersion).getBytes("UTF-8");
    byte[] byteGenerationID =
      String.valueOf(generationId).getBytes("UTF-8");
    /* The message header is stored in the form :
     * <message type><protocol version>
     */
    int length = 1 + versionByte.length + 1 +
                     byteGenerationID.length + 1 +
                     additionalLength;
    byte[] encodedMsg = new byte[length];
@@ -100,7 +107,10 @@
    int pos = 1;
    /* put the protocol version */
    headerLength = addByteArray(versionByte, encodedMsg, pos);
    pos = addByteArray(versionByte, encodedMsg, pos);
    /* put the generationId */
    headerLength = addByteArray(byteGenerationID, encodedMsg, pos);
    return encodedMsg;
  }
@@ -129,6 +139,14 @@
      protocolVersion = Short.valueOf(
          new String(encodedMsg, pos, length, "UTF-8"));
      pos += length + 1;
      /* read the generationId */
      length = getNextLength(encodedMsg, pos);
      generationId = Long.valueOf(new String(encodedMsg, pos, length,
          "UTF-8"));
      pos += length +1;
      return pos;
    } catch (UnsupportedEncodingException e)
    {
@@ -147,4 +165,14 @@
  {
    return protocolVersion;
  }
  /**
   * Get the generationId from this message.
   * @return The generationId.
   */
  public long getGenerationId()
  {
    return generationId;
  }
}
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -65,11 +65,19 @@
 */
public class DbHandler implements Runnable
{
  // This queue hold all the updates not yet saved to stable storage
  // it is only used as a temporary placeholder so that the write
  // The msgQueue holds all the updates not yet saved to stable storage.
  // This list is only used as a temporary placeholder so that the write
  // in the stable storage can be grouped for efficiency reason.
  // it is never read back by replicationServer threads that are responsible
  // Adding an update synchronously add the update to this list.
  // A dedicated thread loops on flush() and trim().
  // flush() : get a number of changes from the in memory list by block
  //           and write them to the db.
  // trim()  : deletes from the DB a number of changes that are older than a
  //           certain date.
  //
  // Changes are not read back by replicationServer threads that are responsible
  // for pushing the changes to other replication server or to LDAP server
  //
  private LinkedList<UpdateMessage> msgQueue = new LinkedList<UpdateMessage>();
  private ReplicationDB db;
  private ChangeNumber firstChange = null;
@@ -81,6 +89,8 @@
  private boolean done = false;
  private DirectoryThread thread = null;
  private Object flushLock = new Object();
  private ReplicationDbEnv dbenv;
  // The High and low water mark for the max size of the msgQueue.
  // the threads calling add() method will be blocked if the size of
@@ -90,23 +100,29 @@
  final static int MSG_QUEUE_LOWMARK = 4000;
  /**
   * the trim age in milliseconds.
   *
   * The trim age in milliseconds. Changes record in the change DB that
   * are older than this age are removed.
   *
   */
  private long trimage;
  /**
   * Creates a New dbHandler associated to a given LDAP server.
   * Creates a new dbHandler associated to a given LDAP server.
   *
   * @param id Identifier of the DB.
   * @param baseDn the baseDn for which this DB was created.
   * @param replicationServer The ReplicationServer that creates this dbHandler.
   * @param dbenv the Database Env to use to create the ReplicationServer DB.
   * @param generationId The generationId of the data contained in the LDAP
   * server for this domain.
   * @throws DatabaseException If a database problem happened
   */
  public DbHandler(short id, DN baseDn, ReplicationServer replicationServer,
      ReplicationDbEnv dbenv)
      ReplicationDbEnv dbenv, long generationId)
         throws DatabaseException
  {
    this.dbenv = dbenv;
    this.serverId = id;
    this.baseDn = baseDn;
    this.trimage = replicationServer.getTrimage();
@@ -261,7 +277,7 @@
   *
   * @param number the number of changes to be removed.
   */
  private void clear(int number)
  private void clearQueue(int number)
  {
    synchronized (msgQueue)
    {
@@ -346,7 +362,7 @@
  }
  /**
   * Flush old change information from this replicationServer database.
   * Trim old changes from this replicationServer database.
   * @throws DatabaseException In case of database problem.
   */
  private void trim() throws DatabaseException, Exception
@@ -417,7 +433,7 @@
        db.addEntries(changes);
        // remove the changes from the list of changes to be saved.
        clear(changes.size());
        clearQueue(changes.size());
      }
    } while (size >=500);
  }
@@ -517,4 +533,22 @@
    trimage = delay;
  }
  /**
   * Clear the changes from this DB (from both memory cache and DB storage).
   * @throws DatabaseException When an exception occurs while removing the
   * changes from the DB.
   * @throws Exception When an exception occurs while accessing a resource
   * from the DB.
   *
   */
  public void clear() throws DatabaseException, Exception
  {
    synchronized(flushLock)
    {
      msgQueue.clear();
    }
    db.clear();
    firstChange = db.readFirstChange();
    lastChange = db.readLastChange();
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -28,6 +28,8 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -47,6 +49,7 @@
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.types.DN;
import com.sleepycat.je.DatabaseException;
@@ -106,6 +109,15 @@
    new ConcurrentHashMap<Short, DbHandler>();
  private ReplicationServer replicationServer;
  /* GenerationId management */
  private long generationId = -1;
  private boolean generationIdSavedStatus = false;
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * Creates a new ReplicationCache associated to the DN baseDn.
   *
@@ -117,7 +129,13 @@
  {
    this.baseDn = baseDn;
    this.replicationServer = replicationServer;
  }
    if (debugEnabled())
      TRACER.debugInfo(
        "In " + this.replicationServer.getMonitorInstanceName() +
        " Created Cache for " + baseDn + " " +
        stackTraceToSingleLineString(new Exception()));
}
  /**
   * Add an update that has been received to the list of
@@ -138,6 +156,7 @@
     * other replication server before pushing it to the LDAP servers
     */
    short id  = update.getChangeNumber().getServerId();
    sourceHandler.updateServerState(update);
    sourceHandler.incrementInCount();
@@ -158,19 +177,21 @@
      }
    }
    // look for the dbHandler that is responsible for the master server which
    // look for the dbHandler that is responsible for the LDAP server which
    // generated the change.
    DbHandler dbHandler = null;
    synchronized (sourceDbHandlers)
    {
      short id  = update.getChangeNumber().getServerId();
      dbHandler   = sourceDbHandlers.get(id);
      if (dbHandler == null)
      {
        try
        {
          dbHandler = replicationServer.newDbHandler(id, baseDn);
        } catch (DatabaseException e)
          dbHandler = replicationServer.newDbHandler(id,
              baseDn, generationId);
          generationIdSavedStatus = true;
        }
        catch (DatabaseException e)
        {
          /*
           * Because of database problem we can't save any more changes
@@ -250,6 +271,15 @@
      }
      connectedServers.put(handler.getServerId(), handler);
      // It can be that the server that connects here is the
      // first server connected for a domain.
      // In that case, we will establish the appriopriate connections
      // to the other repl servers for this domain and receive
      // their ReplServerInfo messages.
      // FIXME: Is it necessary to end this above processing BEFORE listening
      //        to incoming messages for that domain ? But the replica
      //        would raise Read Timeout for replica that connects.
      // Update the remote replication servers with our list
      // of connected LDAP servers
      sendReplServerInfo();
@@ -265,17 +295,90 @@
   */
  public void stopServer(ServerHandler handler)
  {
    if (debugEnabled())
      TRACER.debugInfo(
        "In RS " + this.replicationServer.getMonitorInstanceName() +
        " for " + baseDn + " " +
        " stopServer " + handler.getMonitorInstanceName());
    handler.stopHandler();
    if (handler.isReplicationServer())
    {
      replicationServers.remove(handler.getServerId());
    }
    else
    {
      connectedServers.remove(handler.getServerId());
    }
      // Update the remote replication servers with our list
      // of connected LDAP servers
      sendReplServerInfo();
    mayResetGenerationId();
    // Update the remote replication servers with our list
    // of connected LDAP servers
    sendReplServerInfo();
  }
  /**
   * Resets the generationId for this domain if there is no LDAP
   * server currently connected and if the generationId has never
   * been saved.
   */
  protected void mayResetGenerationId()
  {
    if (debugEnabled())
      TRACER.debugInfo(
        "In RS " + this.replicationServer.getMonitorInstanceName() +
        " for " + baseDn + " " +
        " mayResetGenerationId generationIdSavedStatus=" +
        generationIdSavedStatus);
    // If there is no more any LDAP server connected to this domain in the
    // topology and the generationId has never been saved, then we can reset
    // it and the next LDAP server to connect will become the new reference.
    boolean lDAPServersConnectedInTheTopology = false;
    if (connectedServers.isEmpty())
    {
      for (ServerHandler rsh : replicationServers.values())
      {
        if (generationId != rsh.getGenerationId())
        {
          if (debugEnabled())
            TRACER.debugInfo(
                "In RS " + this.replicationServer.getMonitorInstanceName() +
                " for " + baseDn + " " +
                " mayResetGenerationId skip RS" + rsh.getMonitorInstanceName() +
                " thas different genId");
        }
        else
        {
          if (!rsh.getRemoteLDAPServers().isEmpty())
          {
            lDAPServersConnectedInTheTopology = true;
            if (debugEnabled())
              TRACER.debugInfo(
                  "In RS " + this.replicationServer.getMonitorInstanceName() +
                  " for " + baseDn + " " +
                  " mayResetGenerationId RS" + rsh.getMonitorInstanceName() +
              " has servers connected to it - will not reset generationId");
          }
        }
      }
    }
    else
    {
      lDAPServersConnectedInTheTopology = true;
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS " + this.replicationServer.getMonitorInstanceName() +
          " for " + baseDn + " " +
          " has servers connected to it - will not reset generationId");
    }
    if ((!lDAPServersConnectedInTheTopology) && (!this.generationIdSavedStatus))
    {
      setGenerationId(-1, false);
    }
  }
@@ -321,7 +424,7 @@
      // Update this server with the list of LDAP servers
      // already connected
      handler.sendInfo(
          new ReplServerInfoMessage(getConnectedLDAPservers()));
          new ReplServerInfoMessage(getConnectedLDAPservers(),generationId));
      return true;
    }
@@ -437,17 +540,20 @@
  }
  /**
   * creates a new ReplicationDB with specified identifier.
   * @param id the identifier of the new ReplicationDB.
   * @param db the new db.
   * Sets the provided DbHandler associated to the provided serverId.
   *
   * @param serverId  the serverId for the server to which is
   *                  associated the Dbhandler.
   * @param dbHandler the dbHandler associated to the serverId.
   *
   * @throws DatabaseException If a database error happened.
   */
  public void newDb(short id, DbHandler db) throws DatabaseException
  public void setDbHandler(short serverId, DbHandler dbHandler)
  throws DatabaseException
  {
    synchronized (sourceDbHandlers)
    {
      sourceDbHandlers.put(id , db);
      sourceDbHandlers.put(serverId , dbHandler);
    }
  }
@@ -557,7 +663,8 @@
  }
  /**
   * Process an InitializeRequestMessage.
   * Processes a message coming from one server in the topology
   * and potentially forwards it to one or all other servers.
   *
   * @param msg The message received and to be processed.
   * @param senderHandler The server handler of the server that emitted
@@ -565,6 +672,23 @@
   */
  public void process(RoutableMessage msg, ServerHandler senderHandler)
  {
    // A replication server is not expected to be the destination
    // of a routable message except for an error message.
    if (msg.getDestination() == this.replicationServer.getServerId())
    {
      if (msg instanceof ErrorMessage)
      {
        ErrorMessage errorMsg = (ErrorMessage)msg;
        logError(ERR_ERROR_MSG_RECEIVED.get(
                   errorMsg.getDetails()));
      }
      else
      {
        logError(NOTE_ERR_ROUTING_TO_SERVER.get(
            msg.getClass().getCanonicalName()));
      }
      return;
    }
    List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
@@ -572,9 +696,13 @@
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
      mb.append("serverID:" + msg.getDestination());
      mb.append(" unreachable server ID=" + msg.getDestination());
      mb.append(" unroutable message =" + msg);
      ErrorMessage errMsg = new ErrorMessage(
              msg.getsenderID(), mb.toMessage());
          this.replicationServer.getServerId(),
          msg.getsenderID(),
          mb.toMessage());
      try
      {
        senderHandler.send(errMsg);
@@ -583,8 +711,8 @@
      {
        // TODO Handle error properly (sender timeout in addition)
        /*
         * An error happened trying the send back an error to this server.
         * Log an error and close the connection to the sender server.
         * An error happened trying to send an error msg to this server.
         * Log an error and close the connection to this server.
         */
        MessageBuilder mb2 = new MessageBuilder();
        mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
@@ -788,7 +916,7 @@
    private void sendReplServerInfo()
    {
      ReplServerInfoMessage info =
        new ReplServerInfoMessage(getConnectedLDAPservers());
        new ReplServerInfoMessage(getConnectedLDAPservers(), generationId);
      for (ServerHandler handler : replicationServers.values())
      {
        try
@@ -811,6 +939,26 @@
    }
    /**
     * Get the generationId associated to this domain.
     *
     * @return The generationId
     */
    public long getGenerationId()
    {
      return generationId;
    }
    /**
     * Get the generationId saved status.
     *
     * @return The generationId saved status.
     */
    public boolean getGenerationIdSavedStatus()
    {
      return generationIdSavedStatus;
    }
    /**
     * Sets the replication server informations for the provided
     * handler from the provided ReplServerInfoMessage.
     *
@@ -822,4 +970,162 @@
    {
      handler.setReplServerInfo(infoMsg);
    }
    /**
     * Sets the provided value as the new in memory generationId.
     *
     * @param generationId The new value of generationId.
     * @param savedStatus  The saved status of the generationId.
     */
    synchronized public void setGenerationId(long generationId,
        boolean savedStatus)
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDN=" + baseDn +
          " RCache.set GenerationId=" + generationId);
      if (generationId == this.generationId)
        return;
      if (this.generationId>0)
      {
        for (ServerHandler handler : connectedServers.values())
        {
          handler.resetGenerationId();
        }
      }
      this.generationId = generationId;
      this.generationIdSavedStatus = savedStatus;
    }
    /**
     * Resets the generationID.
     *
     * @param senderHandler The handler associated to the server
     *        that requested to reset the generationId.
     */
    public void resetGenerationId(ServerHandler senderHandler)
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDN=" + baseDn +
          " RCache.resetGenerationId");
      // Notifies the others LDAP servers that from now on
      // they have the bad generationId
      for (ServerHandler handler : connectedServers.values())
      {
        handler.resetGenerationId();
      }
      // Propagates the reset message to the others replication servers
      // dealing with the same domain.
      if (senderHandler.isLDAPserver())
      {
        for (ServerHandler handler : replicationServers.values())
        {
          try
          {
            handler.sendGenerationId(new ResetGenerationId());
          }
          catch (IOException e)
          {
            logError(ERR_CHANGELOG_ERROR_SENDING_INFO.
                get(handler.getMonitorInstanceName()));
           }
        }
      }
      // Reset the localchange and state db for the current domain
      synchronized (sourceDbHandlers)
      {
        for (DbHandler dbHandler : sourceDbHandlers.values())
        {
          try
          {
            dbHandler.clear();
          }
          catch (Exception e)
          {
            // TODO: i18n
            logError(Message.raw(
                "Exception caught while clearing dbHandler:" +
                e.getLocalizedMessage()));
          }
        }
        sourceDbHandlers.clear();
        if (debugEnabled())
          TRACER.debugInfo(
              "In " + this.replicationServer.getMonitorInstanceName() +
              " baseDN=" + baseDn +
              " The source db handler has been cleared");
      }
      try
      {
        replicationServer.clearGenerationId(baseDn);
      }
      catch (Exception e)
      {
        // TODO: i18n
        logError(Message.raw(
            "Exception caught while clearing generationId:" +
            e.getLocalizedMessage()));
      }
      // Reset the in memory domain generationId
      generationId = -1;
    }
    /**
     * Returns whether the provided server is in degraded
     * state due to the fact that the peer server has an invalid
     * generationId for this domain.
     *
     * @param serverId The serverId for which we want to know the
     *                 the state.
     * @return Whether it is degraded or not.
     */
    public boolean isDegradedDueToGenerationId(short serverId)
    {
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDN=" + baseDn +
            " isDegraded serverId=" + serverId +
            " given local generation Id=" + this.generationId);
      ServerHandler handler = replicationServers.get(serverId);
      if (handler == null)
      {
        handler = connectedServers.get(serverId);
        if (handler == null)
        {
          return false;
        }
      }
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDN=" + baseDn +
            " Compute degradation of serverId=" + serverId +
            " LS server generation Id=" + handler.getGenerationId());
      return (handler.getGenerationId() != this.generationId);
    }
    /**
     * Return the associated replication server.
     * @return The replication server.
     */
    public ReplicationServer getReplicationServer()
    {
      return replicationServer;
    }
}
opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -62,11 +62,11 @@
  /**
   * Creates a new database or open existing database that will be used
   * to store and retrieve changes from an LDAP server.
   * @param serverId Identifier of the LDAP server.
   * @param baseDn baseDn of the LDAP server.
   * @param replicationServer the ReplicationServer that needs to be shutdown
   * @param dbenv the Db encironemnet to use to create the db
   * @throws DatabaseException if a database problem happened
   * @param serverId The identifier of the LDAP server.
   * @param baseDn The baseDn of the replication domain.
   * @param replicationServer The ReplicationServer that needs to be shutdown.
   * @param dbenv The Db environment to use to create the db.
   * @throws DatabaseException If a database problem happened.
   */
  public ReplicationDB(Short serverId, DN baseDn,
                     ReplicationServer replicationServer,
@@ -77,8 +77,10 @@
    this.baseDn = baseDn;
    this.dbenv = dbenv;
    this.replicationServer = replicationServer;
    db = dbenv.getOrAddDb(serverId, baseDn);
    // Get or create the associated Replicationcache and Db.
    db = dbenv.getOrAddDb(serverId, baseDn,
        replicationServer.getReplicationCache(baseDn, true).getGenerationId());
  }
  /**
@@ -472,4 +474,19 @@
      cursor.delete();
    }
  }
  /**
   * Clears this change DB from the changes it contains.
   *
   * @throws Exception Throws an exception it occurs.
   * @throws DatabaseException Throws a DatabaseException when it occurs.
   */
  public void clear() throws Exception, DatabaseException
  {
    // Clears the changes
    dbenv.clearDb(this.toString());
    // Clears the reference to this serverID
    dbenv.clearServerId(baseDn, serverId);
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -25,10 +25,12 @@
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -58,6 +60,12 @@
  private Environment dbEnvironment = null;
  private Database stateDb = null;
  private ReplicationServer replicationServer = null;
  private static final String GENERATION_ID_TAG = "GENID";
  private static final String FIELD_SEPARATOR = " ";
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * Initialize this class.
@@ -117,16 +125,120 @@
    Cursor cursor = stateDb.openCursor(null, null);
    DatabaseEntry key = new DatabaseEntry();
    DatabaseEntry data = new DatabaseEntry();
    try
    {
      /*
       *  Get the domain base DN/ generationIDs records
       */
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS)
      {
        try
        {
          String stringData = new String(data.getData(), "UTF-8");
          String[] str = stringData.split(" ", 2);
          short serverId = new Short(str[0]);
          if (debugEnabled())
            TRACER.debugInfo(
                "In " + this.replicationServer.getMonitorInstanceName() +
                " Read tag baseDn generationId=" + stringData);
          String[] str = stringData.split(FIELD_SEPARATOR, 3);
          if (str[0].equals(GENERATION_ID_TAG))
          {
            long generationId=-1;
            DN baseDn;
            try
            {
              // <generationId>
              generationId = new Long(str[1]);
            }
            catch (NumberFormatException e)
            {
              // should never happen
              // TODO: i18n
              throw new ReplicationDBException(Message.raw(
                  "replicationServer state database has a wrong format: " +
                  e.getLocalizedMessage()
                  + "<" + str[1] + ">"));
            }
            // <baseDn>
            baseDn = null;
            try
            {
              baseDn = DN.decode(str[2]);
            } catch (DirectoryException e)
            {
              Message message =
                ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
              logError(message);
            }
            if (debugEnabled())
              TRACER.debugInfo(
                "In " + this.replicationServer.getMonitorInstanceName() +
                " Has read baseDn=" + baseDn
                + " generationId=" + generationId);
            replicationServer.getReplicationCache(baseDn, true).
            setGenerationId(generationId, true);
          }
        }
        catch (UnsupportedEncodingException e)
        {
          // should never happens
          // TODO: i18n
          throw new ReplicationDBException(Message.raw("need UTF-8 support"));
        }
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
      /*
       * Get the server Id / domain base DN records
       */
      status = cursor.getFirst(key, data, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS)
      {
        String stringData = null;
        try
        {
          stringData = new String(data.getData(), "UTF-8");
        }
        catch (UnsupportedEncodingException e)
        {
          // should never happens
          // TODO: i18n
          throw new ReplicationDBException(Message.raw(
          "need UTF-8 support"));
        }
        if (debugEnabled())
          TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            " Read serverId BaseDN=" + stringData);
        String[] str = stringData.split(FIELD_SEPARATOR, 2);
        if (!str[0].equals(GENERATION_ID_TAG))
        {
          short serverId = -1;
          try
          {
            // <serverId>
            serverId = new Short(str[0]);
          } catch (NumberFormatException e)
          {
            // should never happen
            // TODO: i18n
            throw new ReplicationDBException(Message.raw(
                "replicationServer state database has a wrong format: " +
                e.getLocalizedMessage()
                + "<" + str[0] + ">"));
          }
          // <baseDn>
          DN baseDn = null;
          try
          {
@@ -134,113 +246,302 @@
          } catch (DirectoryException e)
          {
            Message message =
                ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
              ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
            logError(message);
          }
          if (debugEnabled())
            TRACER.debugInfo(
              "In " + this.replicationServer.getMonitorInstanceName() +
              " Has read: baseDn=" + baseDn
              + " serverId=" + serverId);
          DbHandler dbHandler =
            new DbHandler(serverId, baseDn, replicationServer, this);
          replicationServer.getReplicationCache(baseDn).newDb(serverId,
                                                            dbHandler);
        } catch (NumberFormatException e)
        {
          // should never happen
          // TODO: i18n
          throw new ReplicationDBException(Message.raw(
              "replicationServer state database has a wrong format"));
        } catch (UnsupportedEncodingException e)
        {
          // should never happens
          // TODO: i18n
          throw new ReplicationDBException(Message.raw(
                  "need UTF-8 support"));
            new DbHandler(serverId, baseDn, replicationServer, this, 1);
          replicationServer.getReplicationCache(baseDn, true).
          setDbHandler(serverId, dbHandler);
        }
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
      cursor.close();
    } catch (DatabaseException dbe) {
    }
    catch (DatabaseException dbe)
    {
      cursor.close();
      throw dbe;
    }
  }
  /**
   * Find or create the database used to store changes from the server
   * with the given serverId and the given baseDn.
   * @param serverId The server id that identifies the server.
   * @param baseDn The baseDn that identifies the server.
   * @return the Database.
   * @throws DatabaseException in case of underlying Exception.
   */
  public Database getOrAddDb(Short serverId, DN baseDn)
                  throws DatabaseException
  {
    try
    /**
     * Finds or creates the database used to store changes from the server
     * with the given serverId and the given baseDn.
     *
     * @param serverId     The server id that identifies the server.
     * @param baseDn       The baseDn that identifies the domain.
     * @param generationId The generationId associated to this domain.
     * @return the Database.
     * @throws DatabaseException in case of underlying Exception.
     */
    public Database getOrAddDb(Short serverId, DN baseDn, Long generationId)
    throws DatabaseException
    {
      String stringId = serverId.toString() + " " + baseDn.toNormalizedString();
      byte[] byteId;
      byteId = stringId.getBytes("UTF-8");
      // Open the database. Create it if it does not already exist.
      DatabaseConfig dbConfig = new DatabaseConfig();
      dbConfig.setAllowCreate(true);
      dbConfig.setTransactional(true);
      Database db = dbEnvironment.openDatabase(null, stringId, dbConfig);
      DatabaseEntry key = new DatabaseEntry();
      key.setData(byteId);
      DatabaseEntry data = new DatabaseEntry();
      OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
      if (status == OperationStatus.NOTFOUND)
      if (debugEnabled())
        TRACER.debugInfo("ReplicationDbEnv.getOrAddDb() " +
          serverId + " " + baseDn + " " + generationId);
      try
      {
        Transaction txn = dbEnvironment.beginTransaction(null, null);
        try {
          data.setData(byteId);
          stateDb.put(txn, key, data);
          txn.commitWriteNoSync();
        } catch (DatabaseException dbe)
        String stringId = serverId.toString() + FIELD_SEPARATOR
                          + baseDn.toNormalizedString();
        // Opens the database for the changes received from this server
        // on this domain. Create it if it does not already exist.
        DatabaseConfig dbConfig = new DatabaseConfig();
        dbConfig.setAllowCreate(true);
        dbConfig.setTransactional(true);
        Database db = dbEnvironment.openDatabase(null, stringId, dbConfig);
        // Creates the record serverId/domain base Dn in the stateDb
        // if it does not already exist.
        byte[] byteId;
        byteId = stringId.getBytes("UTF-8");
        DatabaseEntry key = new DatabaseEntry();
        key.setData(byteId);
        DatabaseEntry data = new DatabaseEntry();
        OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
        if (status == OperationStatus.NOTFOUND)
        {
          // Abort the txn and propagate the Exception to the caller
          txn.abort();
          throw dbe;
          Transaction txn = dbEnvironment.beginTransaction(null, null);
          try {
            data.setData(byteId);
            if (debugEnabled())
              TRACER.debugInfo("getOrAddDb() Created in the state Db record " +
                " serverId/Domain=<"+stringId+">");
            stateDb.put(txn, key, data);
            txn.commitWriteNoSync();
          } catch (DatabaseException dbe)
          {
            // Abort the txn and propagate the Exception to the caller
            txn.abort();
            throw dbe;
          }
        }
        // Creates the record domain base Dn/ generationId in the stateDb
        // if it does not already exist.
        stringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
          baseDn.toNormalizedString();
        String dataStringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
        generationId.toString() + FIELD_SEPARATOR +
          baseDn.toNormalizedString();
        byteId = stringId.getBytes("UTF-8");
        byte[] dataByteId;
        dataByteId = dataStringId.getBytes("UTF-8");
        key = new DatabaseEntry();
        key.setData(byteId);
        data = new DatabaseEntry();
        status = stateDb.get(null, key, data, LockMode.DEFAULT);
        if (status == OperationStatus.NOTFOUND)
        {
          Transaction txn = dbEnvironment.beginTransaction(null, null);
          try {
            data.setData(dataByteId);
            if (debugEnabled())
              TRACER.debugInfo(
                  "Created in the state Db record Tag/Domain/GenId key=" +
                  stringId + " value=" + dataStringId);
            stateDb.put(txn, key, data);
            txn.commitWriteNoSync();
          } catch (DatabaseException dbe)
          {
            // Abort the txn and propagate the Exception to the caller
            txn.abort();
            throw dbe;
          }
        }
        return db;
      }
      catch (UnsupportedEncodingException e)
      {
        // can't happen
        return null;
      }
    }
    /**
     * Creates a new transaction.
     *
     * @return the transaction.
     * @throws DatabaseException in case of underlying database Exception.
     */
    public Transaction beginTransaction() throws DatabaseException
    {
      return dbEnvironment.beginTransaction(null, null);
    }
    /**
     * Shutdown the Db environment.
     */
    public void shutdown()
    {
      try
      {
        stateDb.close();
        dbEnvironment.close();
      } catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
      }
    }
    /**
     * Clears the provided generationId associated to the provided baseDn
     * from the state Db.
     *
     * @param baseDn The baseDn for which the generationID must be cleared.
     *
     */
    public void clearGenerationId(DN baseDn)
    {
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
          " clearGenerationId " + baseDn);
      try
      {
        // Deletes the record domain base Dn/ generationId in the stateDb
        String stringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
          baseDn.toNormalizedString();
        byte[] byteId = stringId.getBytes("UTF-8");
        DatabaseEntry key = new DatabaseEntry();
        key.setData(byteId);
        DatabaseEntry data = new DatabaseEntry();
        OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
        if ((status == OperationStatus.SUCCESS) ||
            (status == OperationStatus.KEYEXIST))
        {
          Transaction txn = dbEnvironment.beginTransaction(null, null);
          try
          {
            stateDb.delete(txn, key);
            txn.commitWriteNoSync();
            if (debugEnabled())
              TRACER.debugInfo(
                "In " + this.replicationServer.getMonitorInstanceName() +
                " clearGenerationId (" +
                baseDn +") succeeded.");
          }
          catch (DatabaseException dbe)
          {
            // Abort the txn and propagate the Exception to the caller
            txn.abort();
            throw dbe;
          }
        }
        else
        {
          // TODO : should have a better error logging
          if (debugEnabled())
            TRACER.debugInfo(
              "In " + this.replicationServer.getMonitorInstanceName() +
              " clearGenerationId ("+ baseDn + " failed" + status.toString());
        }
      }
      return db;
    } catch (UnsupportedEncodingException e)
    {
      // can't happen
      return null;
      catch (UnsupportedEncodingException e)
      {
        // can't happen
      }
      catch (DatabaseException dbe)
      {
        // can't happen
      }
    }
  }
  /**
   * Creates a new transaction.
   *
   * @return the transaction.
   * @throws DatabaseException in case of underlying database Exception.
   */
  public Transaction beginTransaction() throws DatabaseException
  {
    return dbEnvironment.beginTransaction(null, null);
  }
    /**
     * Clears the provided serverId associated to the provided baseDn
     * from the state Db.
     *
     * @param baseDn The baseDn for which the generationID must be cleared.
     * @param serverId The serverId to remove from the Db.
     *
     */
    public void clearServerId(DN baseDn, Short serverId)
    {
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            "clearServerId(baseDN=" + baseDn + ", serverId=" + serverId);
      try
      {
        String stringId = serverId.toString() + FIELD_SEPARATOR
                         + baseDn.toNormalizedString();
  /**
   * Shutdown the Db environment.
   */
  public void shutdown()
  {
    try
    {
      stateDb.close();
      dbEnvironment.close();
    } catch (DatabaseException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
        // Deletes the record serverId/domain base Dn in the stateDb
        byte[] byteId;
        byteId = stringId.getBytes("UTF-8");
        DatabaseEntry key = new DatabaseEntry();
        key.setData(byteId);
        DatabaseEntry data = new DatabaseEntry();
        OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
        if (status != OperationStatus.NOTFOUND)
        {
          Transaction txn = dbEnvironment.beginTransaction(null, null);
          try {
            data.setData(byteId);
            stateDb.delete(txn, key);
            txn.commitWriteNoSync();
            if (debugEnabled())
              TRACER.debugInfo(
                  " In " + this.replicationServer.getMonitorInstanceName() +
                  " clearServerId() succeeded " + baseDn + " " +
                  serverId);
          }
          catch (DatabaseException dbe)
          {
            // Abort the txn and propagate the Exception to the caller
            txn.abort();
            throw dbe;
          }
        }
      }
      catch (UnsupportedEncodingException e)
      {
        // can't happen
      }
      catch (DatabaseException dbe)
      {
        // can't happen
      }
    }
  }
    /**
     * Clears the database.
     *
     * @param databaseName The name of the database to clear.
     */
    public final void clearDb(String databaseName)
    {
      try
      {
        if (debugEnabled())
          TRACER.debugInfo(
              "In " + this.replicationServer.getMonitorInstanceName() +
              "clearDb" + databaseName);
        Transaction txn = dbEnvironment.beginTransaction(null, null);
        dbEnvironment.truncateDatabase(txn, databaseName, false);
      }
      catch (DatabaseException dbe)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_ERROR_CLEARING_DB.get(databaseName,
            dbe.getLocalizedMessage()));
        logError(mb.toMessage());
      }
    }
}
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -25,13 +25,13 @@
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.Message;
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import org.opends.messages.MessageBuilder;
import static org.opends.server.util.StaticUtils.getFileForPath;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.File;
import java.io.IOException;
@@ -61,6 +61,8 @@
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
import org.opends.server.types.ResultCode;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import com.sleepycat.je.DatabaseException;
@@ -101,12 +103,17 @@
  private int queueSize;
  private String dbDirname = null;
  private long trimAge; // the time (in sec) after which the  changes must
                        // be deleted from the persistent storage.
  private int replicationPort;
                        // de deleted from the persistent storage.
  private boolean stopListen = false;
  private ReplSessionSecurity replSessionSecurity;
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * Creates a new Replication server using the provided configuration entry.
   *
   * @param configuration The configuration of this replication server.
@@ -191,7 +198,7 @@
        // The socket has probably been closed as part of the
        // shutdown or changing the port number process.
        // just log debug information and loop.
        Message message = DEBUG_REPLICATION_PORT_IOEXCEPTION.get();
        Message message = ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage());
        logError(message);
      }
    }
@@ -272,6 +279,10 @@
    String hostname = serverURL.substring(0, separator);
    boolean sslEncryption = replSessionSecurity.isSslEncryption(serverURL);
    if (debugEnabled())
      TRACER.debugInfo("RS " + this.getMonitorInstanceName() +
               " connects to " + serverURL);
    try
    {
      InetSocketAddress ServerAddr = new InetSocketAddress(
@@ -329,22 +340,41 @@
      listenSocket.bind(new InetSocketAddress(changelogPort));
      /*
       * create working threads
       * creates working threads
       * We must first connect, then start to listen.
       */
      listenThread =
        new ReplicationServerListenThread("Replication Server Listener", this);
      listenThread.start();
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " creates connect threads");
      connectThread =
        new ReplicationServerConnectThread("Replication Server Connect", this);
      connectThread.start();
      // FIXME : Is it better to have the time to receive the ReplServerInfo
      // from all the other replication servers since this info is necessary
      // to route an early received total update request.
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " creates listen threads");
      listenThread =
        new ReplicationServerListenThread("Replication Server Listener", this);
      listenThread.start();
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " successfully initialized");
    } catch (DatabaseException e)
    {
      Message message = ERR_COULD_NOT_INITIALIZE_DB.get(dbDirname);
      Message message = ERR_COULD_NOT_INITIALIZE_DB.get(
        getFileForPath(dbDirname).getAbsolutePath());
      logError(message);
    } catch (ReplicationDBException e)
    {
      Message message = ERR_COULD_NOT_READ_DB.get(dbDirname);
      Message message = ERR_COULD_NOT_READ_DB.get(dbDirname,
          e.getLocalizedMessage());
      logError(message);
    } catch (UnknownHostException e)
    {
@@ -362,18 +392,22 @@
   * Get the ReplicationCache associated to the base DN given in parameter.
   *
   * @param baseDn The base Dn for which the ReplicationCache must be returned.
   * @param create Specifies whether to create the ReplicationCache if it does
   *        not already exist.
   * @return The ReplicationCache associated to the base DN given in parameter.
   */
  public ReplicationCache getReplicationCache(DN baseDn)
  public ReplicationCache getReplicationCache(DN baseDn, boolean create)
  {
    ReplicationCache replicationCache;
    synchronized (baseDNs)
    {
      replicationCache = baseDNs.get(baseDn);
      if (replicationCache == null)
      if ((replicationCache == null) && (create))
      {
        replicationCache = new ReplicationCache(baseDn, this);
      baseDNs.put(baseDn, replicationCache);
        baseDNs.put(baseDn, replicationCache);
      }
    }
    return replicationCache;
@@ -384,6 +418,9 @@
   */
  public void shutdown()
  {
    if (shutdown)
      return;
    shutdown = true;
    // shutdown the connect thread
@@ -404,6 +441,12 @@
      // replication Server service is closing anyway.
    }
    // shutdown the listen thread
    if (listenThread != null)
    {
      listenThread.interrupt();
    }
    // shutdown all the ChangelogCaches
    for (ReplicationCache replicationCache : baseDNs.values())
    {
@@ -424,13 +467,36 @@
   *
   * @param id The serverId for which the dbHandler must be created.
   * @param baseDn The DN for which the dbHandler muste be created.
   * @param generationId The generationId for this server and this domain.
   * @return The new DB handler for this ReplicationServer and the serverId and
   *         DN given in parameter.
   * @throws DatabaseException in case of underlying database problem.
   */
  DbHandler newDbHandler(short id, DN baseDn) throws DatabaseException
  public DbHandler newDbHandler(short id, DN baseDn, long generationId)
  throws DatabaseException
  {
    return new DbHandler(id, baseDn, this, dbEnv);
    return new DbHandler(id, baseDn, this, dbEnv, generationId);
  }
  /**
   * Clears the generationId for the domain related to the provided baseDn.
   * @param  baseDn The baseDn for which to delete the generationId.
   * @throws DatabaseException When it occurs.
   */
  public void clearGenerationId(DN baseDn)
  throws DatabaseException
  {
    try
    {
      dbEnv.clearGenerationId(baseDn);
    }
    catch(Exception e)
    {
      TRACER.debugInfo(
          "In RS <" + getMonitorInstanceName() +
          " Exception in clearGenerationId" +
          stackTraceToSingleLineString(e) + e.getLocalizedMessage());
    }
  }
  /**
@@ -618,7 +684,50 @@
    Attribute bases = new Attribute(baseType, "base-dn", baseValues);
    attributes.add(bases);
    // Publish to monitor the generation ID by domain
    AttributeType generationIdType=
      DirectoryServer.getAttributeType("base-dn-generation-id", true);
    LinkedHashSet<AttributeValue> generationIdValues =
      new LinkedHashSet<AttributeValue>();
    for (DN base : baseDNs.keySet())
    {
      long generationId=-1;
      ReplicationCache cache = getReplicationCache(base, false);
      if (cache != null)
        generationId = cache.getGenerationId();
      generationIdValues.add(new AttributeValue(generationIdType,
          base.toString() + " " + generationId));
    }
    Attribute generationIds = new Attribute(generationIdType, "generation-id",
        generationIdValues);
    attributes.add(generationIds);
    return attributes;
  }
  /**
   * Get the value of generationId for the replication domain
   * associated with the provided baseDN.
   *
   * @param baseDN The baseDN of the domain.
   * @return The value of the generationID.
   */
  public long getGenerationId(DN baseDN)
  {
    ReplicationCache rc = this.getReplicationCache(baseDN, false);
    if (rc!=null)
      return rc.getGenerationId();
    return -1;
  }
  /**
   * Get the serverId for this replication server.
   *
   * @return The value of the serverId.
   *
   */
  public short getServerId()
  {
    return serverId;
  }
}
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -51,6 +51,8 @@
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.*;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -67,6 +69,7 @@
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
@@ -143,6 +146,7 @@
  private short replicationServerId;
  private short protocolVersion;
  private long generationId=-1;
  /**
@@ -189,7 +193,7 @@
   * Then create the reader and writer thread.
   *
   * @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
   *               null if this is an incoming connection.
   *               null if this is an incoming connection (listen).
   * @param replicationServerId The identifier of the replicationServer that
   *                            creates this server handler.
   * @param replicationServerURL The URL of the replicationServer that creates
@@ -206,22 +210,34 @@
                    int windowSize, boolean sslEncryption,
                    ReplicationServer replicationServer)
  {
    if (debugEnabled())
      TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() +
                " starts a new LS or RS " +
                ((baseDn == null)?"incoming connection":"outgoing connection"));
    this.replicationServerId = replicationServerId;
    rcvWindowSizeHalf = windowSize/2;
    maxRcvWindow = windowSize;
    rcvWindow = windowSize;
    long localGenerationId=-1;
    try
    {
      if (baseDn != null)
      {
        // This is an outgoing connection. Publish our start message.
        this.baseDn = baseDn;
        replicationCache = replicationServer.getReplicationCache(baseDn);
        // Get or create the ReplicationCache
        replicationCache = replicationServer.getReplicationCache(baseDn, true);
        localGenerationId = replicationCache.getGenerationId();
        ServerState localServerState = replicationCache.getDbServerState();
        ReplServerStartMessage msg =
          new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                    baseDn, windowSize, localServerState,
                                    protocolVersion, sslEncryption);
                                    protocolVersion, localGenerationId,
                                    sslEncryption);
        session.publish(msg);
      }
@@ -229,9 +245,10 @@
      ReplicationMessage msg = session.receive();
      if (msg instanceof ServerStartMessage)
      {
        // The remote server is an LDAP Server
        // The remote server is an LDAP Server.
        ServerStartMessage receivedMsg = (ServerStartMessage) msg;
        generationId = receivedMsg.getGenerationId();
        protocolVersion = ProtocolVersion.minWithCurrent(
            receivedMsg.getVersion());
        serverId = receivedMsg.getServerId();
@@ -281,15 +298,69 @@
        serverIsLDAPserver = true;
        // This an incoming connection. Publish our start message
        replicationCache = replicationServer.getReplicationCache(this.baseDn);
        // Get or Create the ReplicationCache
        replicationCache = replicationServer.getReplicationCache(this.baseDn,
            true);
        localGenerationId = replicationCache.getGenerationId();
        ServerState localServerState = replicationCache.getDbServerState();
        // This an incoming connection. Publish our start message
        ReplServerStartMessage myStartMsg =
          new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                    this.baseDn, windowSize, localServerState,
                                    protocolVersion, sslEncryption);
                                    protocolVersion, localGenerationId,
                                    sslEncryption);
        session.publish(myStartMsg);
        sendWindowSize = receivedMsg.getWindowSize();
        /* Until here session is encrypted then it depends on the negociation */
        if (!sslEncryption)
        {
          session.stopEncryption();
        }
        if (debugEnabled())
        {
          Set<String> ss = this.serverState.toStringSet();
          Set<String> lss = replicationCache.getDbServerState().toStringSet();
          TRACER.debugInfo("In " + replicationCache.getReplicationServer().
                   getMonitorInstanceName() +
                   ", SH received START from LS serverId=" + serverId +
                   " baseDN=" + this.baseDn +
                   " generationId=" + generationId +
                   " localGenerationId=" + localGenerationId +
                   " state=" + ss +
                   " and sent ReplServerStart with state=" + lss);
        }
        /*
         * If we have already a generationID set for the domain
         * then
         *   if the connecting replica has not the same
         *   then it is degraded locally and notified by an error message
         * else
         *   we set the generationID from the one received
         *   (unsaved yet on disk . will be set with the 1rst change received)
         */
        if (localGenerationId>0)
        {
          if (generationId != localGenerationId)
          {
            Message message = NOTE_BAD_GENERATION_ID.get(
                receivedMsg.getBaseDn().toNormalizedString(),
                Short.toString(receivedMsg.getServerId()),
                Long.toString(generationId),
                Long.toString(localGenerationId));
            ErrorMessage errorMsg =
              new ErrorMessage(replicationServerId, serverId, message);
            session.publish(errorMsg);
          }
        }
        else
        {
          replicationCache.setGenerationId(generationId, false);
        }
      }
      else if (msg instanceof ReplServerStartMessage)
      {
@@ -297,6 +368,7 @@
        ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
        protocolVersion = ProtocolVersion.minWithCurrent(
            receivedMsg.getVersion());
        generationId = receivedMsg.getGenerationId();
        serverId = receivedMsg.getServerId();
        serverURL = receivedMsg.getServerURL();
        int separator = serverURL.lastIndexOf(':');
@@ -306,7 +378,10 @@
        this.baseDn = receivedMsg.getBaseDn();
        if (baseDn == null)
        {
          replicationCache = replicationServer.getReplicationCache(this.baseDn);
          // Get or create the ReplicationCache
          replicationCache = replicationServer.getReplicationCache(this.baseDn,
              true);
          localGenerationId = replicationCache.getGenerationId();
          ServerState serverState = replicationCache.getDbServerState();
          // The session initiator decides whether to use SSL.
@@ -317,7 +392,9 @@
            new ReplServerStartMessage(replicationServerId,
                                       replicationServerURL,
                                       this.baseDn, windowSize, serverState,
                                       protocolVersion, sslEncryption);
                                       protocolVersion,
                                       localGenerationId,
                                       sslEncryption);
          session.publish(outMsg);
        }
        else
@@ -326,6 +403,107 @@
        }
        this.serverState = receivedMsg.getServerState();
        sendWindowSize = receivedMsg.getWindowSize();
        /* Until here session is encrypted then it depends on the negociation */
        if (!sslEncryption)
        {
          session.stopEncryption();
        }
        if (debugEnabled())
        {
          Set<String> ss = this.serverState.toStringSet();
          Set<String> lss = replicationCache.getDbServerState().toStringSet();
          TRACER.debugInfo("In " + replicationCache.getReplicationServer().
                   getMonitorInstanceName() +
                   ", SH received START from RS serverId=" + serverId +
                   " baseDN=" + this.baseDn +
                   " generationId=" + generationId +
                   " localGenerationId=" + localGenerationId +
                   " state=" + ss +
                   " and sent ReplServerStart with state=" + lss);
        }
        // if the remote RS and the local RS have the same genID
        // then it's ok and nothing else to do
        if (generationId == localGenerationId)
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("In " + replicationCache.getReplicationServer().
              getMonitorInstanceName() + " RS with serverID=" + serverId +
              " is connected with the right generation ID");
          }
        }
        else
        {
          if (localGenerationId>0)
          {
            // if the local RS is initialized
            if (generationId>0)
            {
              // if the remote RS is initialized
              if (generationId != localGenerationId)
              {
                // if the 2 RS have different generationID
                if (replicationCache.getGenerationIdSavedStatus())
                {
                  // it the present RS has received changes regarding its
                  //     gen ID and so won't change without a reset
                  // then  we are just degrading the peer.
                  Message message = NOTE_BAD_GENERATION_ID.get(
                      this.baseDn.toNormalizedString(),
                      Short.toString(receivedMsg.getServerId()),
                      Long.toString(generationId),
                      Long.toString(localGenerationId));
                  ErrorMessage errorMsg =
                    new ErrorMessage(replicationServerId, serverId, message);
                  session.publish(errorMsg);
                }
                else
                {
                  // The present RS has never received changes regarding its
                  // gen ID.
                  //
                  // Example case:
                  // - we are in RS1
                  // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
                  // - RS1 has genId1 from LS1 /genId1 comes from data in suffix
                  // - we are in RS1 and we receive a START msg from RS2
                  // - Each RS keeps its genID / is degraded and when LS2 will
                  //   be populated from LS1 everything will becomes ok.
                  //
                  // Issue:
                  // FIXME : Would it be a good idea in some cases to just
                  //         set the gen ID received from the peer RS
                  //         specially if the peer has a non nul state and
                  //         we have a nul state ?
                  // replicationCache.setGenerationId(generationId, false);
                  Message message = NOTE_BAD_GENERATION_ID.get(
                      this.baseDn.toNormalizedString(),
                      Short.toString(receivedMsg.getServerId()),
                      Long.toString(generationId),
                      Long.toString(localGenerationId));
                  ErrorMessage errorMsg =
                    new ErrorMessage(replicationServerId, serverId, message);
                  session.publish(errorMsg);
                }
              }
            }
            else
            {
              // The remote has no genId. We don't change anything for the
              // current RS.
            }
          }
          else
          {
            // The local RS is not initialized - take the one received
            replicationCache.setGenerationId(generationId, false);
          }
        }
      }
      else
      {
@@ -333,12 +511,9 @@
        return;   // we did not recognize the message, ignore it
      }
      if (!sslEncryption)
      {
        session.stopEncryption();
      }
      replicationCache = replicationServer.getReplicationCache(this.baseDn);
      // Get or create the ReplicationCache
      replicationCache = replicationServer.getReplicationCache(this.baseDn,
          true);
      boolean started;
      if (serverIsLDAPserver)
@@ -352,10 +527,11 @@
      if (started)
      {
        writer = new ServerWriter(session, serverId, this, replicationCache);
        // sendWindow MUST be created before starting the writer
        sendWindow = new Semaphore(sendWindowSize);
        reader = new ServerReader(session, serverId, this,
            replicationCache);
        writer = new ServerWriter(session, serverId, this, replicationCache);
        reader = new ServerReader(session, serverId, this, replicationCache);
        reader.start();
        writer.start();
@@ -377,6 +553,12 @@
        // the connection is not valid, close it.
        try
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("In " + replicationCache.getReplicationServer().
              getMonitorInstanceName() + " RS failed to start locally " +
              " the connection from serverID="+serverId);
          }
          session.close();
        } catch (IOException e1)
        {
@@ -388,7 +570,8 @@
    {
      // some problem happened, reject the connection
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get(this.toString()));
      mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get(
          this.getMonitorInstanceName()));
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      try
@@ -399,7 +582,6 @@
        // ignore
      }
    }
    sendWindow = new Semaphore(sendWindowSize);
  }
  /**
@@ -720,6 +902,21 @@
   */
  public void add(UpdateMessage update, ServerHandler sourceHandler)
  {
    /*
     * Ignore updates from a server that is degraded due to
     * its inconsistent generationId
     */
    long referenceGenerationId = replicationCache.getGenerationId();
    if ((referenceGenerationId>0) &&
        (referenceGenerationId != generationId))
    {
      logError(ERR_IGNORING_UPDATE_TO.get(
               update.getDn(),
               this.getMonitorInstanceName()));
      return;
    }
    synchronized (msgQueue)
    {
      /*
@@ -1164,7 +1361,7 @@
    if (serverIsLDAPserver)
      return "Remote LDAP Server " + str;
    else
      return "Remote Replication Server " + str;
      return "Remote Repl Server " + str;
  }
  /**
@@ -1261,7 +1458,10 @@
    attributes.add(attr);
    attributes.add(new Attribute("ssl-encryption",
                                 String.valueOf(session.isEncrypted())));
        String.valueOf(session.isEncrypted())));
    attributes.add(new Attribute("generation-id",
        String.valueOf(generationId)));
    return attributes;
  }
@@ -1385,9 +1585,10 @@
  public void process(RoutableMessage msg)
  {
    if (debugEnabled())
      TRACER.debugInfo("SH(" + replicationServerId + ") receives " +
                 msg + " from " + serverId);
       TRACER.debugInfo("In " + replicationCache.getReplicationServer().
                 getMonitorInstanceName() +
                 " SH for remote server " + this.getMonitorInstanceName() +
                 " processes received msg=" + msg);
    replicationCache.process(msg, this);
  }
@@ -1401,6 +1602,12 @@
   public void sendInfo(ReplServerInfoMessage info)
   throws IOException
   {
     if (debugEnabled())
       TRACER.debugInfo("In " + replicationCache.getReplicationServer().
           getMonitorInstanceName() +
           " SH for remote server " + this.getMonitorInstanceName() +
           " sends message=" + info);
     session.publish(info);
   }
@@ -1412,7 +1619,13 @@
    */
   public void setReplServerInfo(ReplServerInfoMessage infoMsg)
   {
     if (debugEnabled())
       TRACER.debugInfo("In " + replicationCache.getReplicationServer().
           getMonitorInstanceName() +
           " SH for remote server " + this.getMonitorInstanceName() +
           " sets replServerInfo " + "<" + infoMsg + ">");
     remoteLDAPservers = infoMsg.getConnectedServers();
     generationId = infoMsg.getGenerationId();
   }
   /**
@@ -1458,8 +1671,10 @@
  public void send(RoutableMessage msg) throws IOException
  {
    if (debugEnabled())
      TRACER.debugInfo("SH(" + replicationServerId + ") forwards " +
                 msg + " to " + serverId);
          TRACER.debugInfo("In " + replicationCache.getReplicationServer().
              getMonitorInstanceName() +
              " SH for remote server " + this.getMonitorInstanceName() +
              " sends message=" + msg);
    session.publish(msg);
  }
@@ -1492,4 +1707,48 @@
      checkWindow();
    }
  }
  /**
   * Returns the value of generationId for that handler.
   * @return The value of the generationId.
   */
  public long getGenerationId()
  {
    return generationId;
  }
  /**
   * Resets the generationId for this domain.
   */
  public void resetGenerationId()
  {
    // Notify the peer that it is now invalid regarding the generationId
    // We are now waiting a startServer message from this server with
    // a valid generationId.
    try
    {
      Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString());
      ErrorMessage errorMsg =
        new ErrorMessage(serverId, replicationServerId, message);
      session.publish(errorMsg);
    }
    catch (Exception e)
    {
      // FIXME Log exception when sending reset error message
    }
  }
  /**
   * Sends a message containing a generationId to a peer server.
   * The peer is expected to be a replication server.
   *
   * @param  msg         The GenerationIdMessage message to be sent.
   * @throws IOException When it occurs while sending the message,
   *
   */
   public void sendGenerationId(ResetGenerationId msg)
   throws IOException
   {
     session.publish(msg);
   }
}
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -30,6 +30,7 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
@@ -39,6 +40,7 @@
import org.opends.server.replication.protocol.DoneMessage;
import org.opends.server.replication.protocol.EntryMessage;
import org.opends.server.replication.protocol.ErrorMessage;
import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.replication.protocol.InitializeRequestMessage;
import org.opends.server.replication.protocol.InitializeTargetMessage;
import org.opends.server.replication.protocol.ProtocolSession;
@@ -76,7 +78,7 @@
   * Constructor for the LDAP server reader part of the replicationServer.
   *
   * @param session The ProtocolSession from which to read the data.
   * @param serverId The server ID of the server from which we read changes.
   * @param serverId The server ID of the server from which we read messages.
   * @param handler The server handler for this server reader.
   * @param replicationCache The ReplicationCache for this server reader.
   */
@@ -97,14 +99,11 @@
  {
    if (debugEnabled())
    {
      if (handler.isReplicationServer())
      {
        TRACER.debugInfo("Replication server reader starting " + serverId);
      }
      else
      {
        TRACER.debugInfo("LDAP server reader starting " + serverId);
      }
      TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          (handler.isReplicationServer()?" RS ":" LS")+
          " reader starting for serverId=" + serverId);
    }
    /*
     * wait on input stream
@@ -116,6 +115,25 @@
      {
        ReplicationMessage msg = session.receive();
        if (debugEnabled())
        {
          if (handler.isReplicationServer())
          {
            TRACER.debugInfo(
                "In RS <" + replicationCache.getReplicationServer().
                getMonitorInstanceName() +
                "> from RS server with serverId=" + serverId +
                " receives " + msg);
          }
          else
          {
            TRACER.debugInfo(
                "In RS <" + replicationCache.getReplicationServer().
                getMonitorInstanceName() +
                "> from LDAP server with serverId=" + serverId +
                " receives " + msg);
          }
        }
        if (msg instanceof AckMessage)
        {
          AckMessage ack = (AckMessage) msg;
@@ -124,9 +142,22 @@
        }
        else if (msg instanceof UpdateMessage)
        {
          UpdateMessage update = (UpdateMessage) msg;
          handler.decAndCheckWindow();
          replicationCache.put(update, handler);
          // Ignore update received from a replica with
          // a bad generation ID
          long referenceGenerationId = replicationCache.getGenerationId();
          if ((referenceGenerationId>0) &&
              (referenceGenerationId != handler.getGenerationId()))
          {
            logError(ERR_IGNORING_UPDATE_FROM.get(
                msg.toString(),
                handler.getMonitorInstanceName()));
          }
          else
          {
            UpdateMessage update = (UpdateMessage) msg;
            handler.decAndCheckWindow();
            replicationCache.put(update, handler);
          }
        }
        else if (msg instanceof WindowMessage)
        {
@@ -159,6 +190,11 @@
          ErrorMessage errorMsg = (ErrorMessage) msg;
          handler.process(errorMsg);
        }
        else if (msg instanceof ResetGenerationId)
        {
          ResetGenerationId genIdMsg = (ResetGenerationId) msg;
          replicationCache.resetGenerationId(this.handler);
        }
        else if (msg instanceof WindowProbe)
        {
          WindowProbe windowProbeMsg = (WindowProbe) msg;
@@ -168,6 +204,52 @@
        {
          ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
          handler.setReplServerInfo(infoMsg);
          if (debugEnabled())
          {
            if (handler.isReplicationServer())
              TRACER.debugInfo(
               "In RS " + replicationCache.getReplicationServer().
               getServerId() +
               " Receiving replServerInfo from " + handler.getServerId() +
               " baseDn=" + replicationCache.getBaseDn() +
               " genId=" + infoMsg.getGenerationId());
          }
          if (replicationCache.getGenerationId()<0)
          {
            // Here is the case where a ReplicationServer receives from
            // another ReplicationServer the generationId for a domain
            // for which the generation ID has never been set.
            replicationCache.setGenerationId(infoMsg.getGenerationId(), false);
          }
          else
          {
            if (infoMsg.getGenerationId()<0)
            {
              // Here is the case where another ReplicationServer
              // signals that it has no generationId set for the domain.
              // If we have generationId set locally and no server currently
              // connected for that domain in the topology then we may also
              // reset the generationId localy.
              replicationCache.mayResetGenerationId();
            }
            if (replicationCache.getGenerationId() != infoMsg.getGenerationId())
            {
              Message message = NOTE_BAD_GENERATION_ID.get(
                  replicationCache.getBaseDn().toNormalizedString(),
                  Short.toString(handler.getServerId()),
                  Long.toString(infoMsg.getGenerationId()),
                  Long.toString(replicationCache.getGenerationId()));
              ErrorMessage errorMsg = new ErrorMessage(
                  replicationCache.getReplicationServer().getServerId(),
                  handler.getServerId(),
                  message);
              session.publish(errorMsg);
            }
          }
        }
        else if (msg == null)
        {
@@ -187,21 +269,40 @@
       * Log a message and exit from this loop
       * So that this handler is stopped.
       */
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          " reader IO EXCEPTION serverID=" + serverId
          + stackTraceToSingleLineString(e) + e.getLocalizedMessage() +
          e.getCause());
      Message message = NOTE_SERVER_DISCONNECT.get(handler.toString());
      logError(message);
    } catch (ClassNotFoundException e)
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          " reader CNF EXCEPTION serverID=" + serverId
          + stackTraceToSingleLineString(e));
      /*
       * The remote server has sent an unknown message,
       * close the conenction.
       * close the connection.
       */
      Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString());
      logError(message);
    } catch (Exception e)
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          " server reader EXCEPTION serverID=" + serverId
          + stackTraceToSingleLineString(e));
      /*
       * The remote server has sent an unknown message,
       * close the conenction.
       * close the connection.
       */
      Message message = NOTE_READER_EXCEPTION.get(handler.toString());
      logError(message);
@@ -213,6 +314,12 @@
       * happen.
       * Attempt to close the socket and stop the server handler.
       */
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          " reader CLOSE serverID=" + serverId
          + stackTraceToSingleLineString(new Exception()));
      try
      {
        session.close();
@@ -223,15 +330,11 @@
      replicationCache.stopServer(handler);
    }
    if (debugEnabled())
    {
      if (handler.isReplicationServer())
      {
        TRACER.debugInfo("Replication server reader stopping " + serverId);
      }
      else
      {
        TRACER.debugInfo("LDAP server reader stopping " + serverId);
      }
    }
      TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          (handler.isReplicationServer()?"RS":"LDAP") +
          " server reader stopped for serverID=" + serverId
          + stackTraceToSingleLineString(new Exception()));
  }
}
opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -30,6 +30,7 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.messages.ReplicationMessages.*;
import java.io.IOException;
@@ -97,12 +98,35 @@
        TRACER.debugInfo("LDAP server writer starting " + serverId);
      }
    }
    try {
    try
    {
      while (true)
      {
        UpdateMessage update = replicationCache.take(this.handler);
        if (update == null)
          return;       /* this connection is closing */
        // Ignore update to be sent to a replica with a bad generation ID
        long referenceGenerationId = replicationCache.getGenerationId();
        if (referenceGenerationId != handler.getGenerationId())
        {
          logError(ERR_IGNORING_UPDATE_TO.get(
              update.getDn(),
              this.handler.getMonitorInstanceName()));
          continue;
        }
        if (debugEnabled())
        {
          TRACER.debugInfo(
            "In " + replicationCache.getReplicationServer().
              getMonitorInstanceName() +
            ", writer to " + this.handler.getMonitorInstanceName() +
            " publishes" + update.toString() +
            " refgenId=" + referenceGenerationId +
            " server=" + handler.getServerId() +
            " generationId=" + handler.getGenerationId());
        }
        session.publish(update);
      }
    }
@@ -130,7 +154,8 @@
       * An unexpected error happened.
       * Log an error and close the connection.
       */
      Message message = ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler.toString());
      Message message = ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler.toString() +
                        " " +  stackTraceToSingleLineString(e));
      logError(message);
    }
    finally {
opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
@@ -129,7 +129,7 @@
    }
    try
    {
      domain.initializeTarget(target, this);
      domain.initializeRemote(target, this);
    }
    catch(DirectoryException de)
    {
opends/src/server/org/opends/server/tasks/InitializeTask.java
@@ -139,7 +139,7 @@
    try
    {
      // launch the import
      domain.initialize(source, this);
      domain.initializeFromRemote(source, this);
      synchronized(initState)
      {
opends/src/server/org/opends/server/tasks/SetGenerationIdTask.java
New file
@@ -0,0 +1,128 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.tasks;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.core.DirectoryServer.getAttributeType;
import java.util.List;
import org.opends.messages.MessageBuilder;
import org.opends.messages.TaskMessages;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.plugin.ReplicationDomain;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.ResultCode;
/**
 * This class provides an implementation of a Directory Server task that can
 * be used to import data over the replication protocol from another
 * server hosting the same replication domain.
 */
public class SetGenerationIdTask extends Task
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  boolean isCompressed            = false;
  boolean isEncrypted             = false;
  boolean skipSchemaValidation    = false;
  String  domainString            = null;
  ReplicationDomain domain        = null;
  TaskState initState;
  private static final void debugInfo(String s)
  {
    if (debugEnabled())
    {
      // System.out.println(Message.raw(Category.SYNC, Severity.NOTICE, s));
      TRACER.debugInfo(s);
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override public void initializeTask() throws DirectoryException
  {
    if (TaskState.isDone(getTaskState()))
    {
      return;
    }
    // FIXME -- Do we need any special authorization here?
    Entry taskEntry = getTaskEntry();
    AttributeType typeDomainBase;
    // Retrieves the replication domain
    typeDomainBase =
      getAttributeType(ATTR_TASK_SET_GENERATION_ID_DOMAIN_DN, true);
    List<Attribute> attrList;
    attrList = taskEntry.getAttribute(typeDomainBase);
    domainString = TaskUtils.getSingleValueString(attrList);
    DN domainDN = DN.nullDN();
    try
    {
      domainDN = DN.decode(domainString);
    }
    catch(Exception e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(TaskMessages.ERR_TASK_INITIALIZE_INVALID_DN.get());
      mb.append(e.getMessage());
      throw new DirectoryException(ResultCode.INVALID_DN_SYNTAX,
          mb.toMessage());
    }
    domain = ReplicationDomain.retrievesReplicationDomain(domainDN);
  }
  /**
   * {@inheritDoc}
   */
  protected TaskState runTask()
  {
    debugInfo("setGenerationIdTask is starting on domain%s" +
        domain.getBaseDN());
    domain.resetGenerationId();
    debugInfo("setGenerationIdTask is ending SUCCESSFULLY");
    return TaskState.COMPLETED_SUCCESSFULLY;
  }
}
opends/src/server/org/opends/server/types/Schema.java
@@ -207,9 +207,13 @@
  // file.
  private long youngestModificationTime;
  // The Synchronization State.
  // The synchronization State.
  private LinkedHashSet<AttributeValue> synchronizationState = null;
  // The synchronization generationId.
  private LinkedHashSet<AttributeValue> synchronizationGenerationId
    = null;
  /**
@@ -2766,6 +2770,11 @@
      dupSchema.synchronizationState =
        new LinkedHashSet<AttributeValue>(synchronizationState);
    }
    if (synchronizationGenerationId != null)
    {
      dupSchema.synchronizationGenerationId = new
        LinkedHashSet<AttributeValue>(synchronizationGenerationId);
    }
    return dupSchema;
  }
@@ -2786,6 +2795,28 @@
   *
   * @param  values  Synchronization state for this schema.
   */
  public void setSynchronizationGenerationId(
              LinkedHashSet<AttributeValue> values)
  {
    synchronizationGenerationId = values;
  }
  /**
   * Retrieves the Synchronization generationId for this schema.
   *
   * @return  The Synchronization generationId for this schema.
   */
  public LinkedHashSet<AttributeValue>
    getSynchronizationGenerationId()
  {
    return synchronizationGenerationId;
  }
  /**
   * Sets the Synchronization state for this schema.
   *
   * @param  values  Synchronization state for this schema.
   */
  public void setSynchronizationState(
              LinkedHashSet<AttributeValue> values)
  {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
New file
@@ -0,0 +1,1401 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_LOG_MESSAGES;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.File;
import java.net.ServerSocket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.net.SocketTimeoutException;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.TaskState;
import org.opends.server.core.AddOperation;
import org.opends.server.core.AddOperationBasis;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.ReplicationDomain;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DoneMessage;
import org.opends.server.replication.protocol.EntryMessage;
import org.opends.server.replication.protocol.ErrorMessage;
import org.opends.server.replication.protocol.InitializeTargetMessage;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.tasks.LdifFileWriter;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchScope;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
 * Tests contained here:
 *
 * - testSingleRS : test generation ID setting with different servers and one
 *   Replication server.
 *
 * - testMultiRS : tests generation ID propagatoion with more than one
 *   Replication server.
 *
 */
public class GenerationIdTest extends ReplicationTestCase
{
  // The tracer object for the debug logger
  private static final DebugTracer TRACER = getTracer();
  private static final String baseDnStr = "dc=example,dc=com";
  private static final String baseSnStr = "genidcom";
  private static final int   WINDOW_SIZE = 10;
  private static final int   CHANGELOG_QUEUE_SIZE = 100;
  private static final short server1ID = 1;
  private static final short server2ID = 2;
  private static final short server3ID = 3;
  private static final short changelog1ID = 11;
  private static final short changelog2ID = 12;
  private static final short changelog3ID = 13;
  private DN baseDn;
  private ReplicationBroker broker2 = null;
  private ReplicationBroker broker3 = null;
  private ReplicationServer replServer1 = null;
  private ReplicationServer replServer2 = null;
  private ReplicationServer replServer3 = null;
  private boolean emptyOldChanges = true;
  ReplicationDomain replDomain = null;
  private Entry taskInitRemoteS2;
  SocketSession ssSession = null;
  boolean ssShutdownRequested = false;
  protected String[] updatedEntries;
  private static int[] replServerPort = new int[20];
  /**
   * A temporary LDIF file containing some test entries.
   */
  private File ldifFile;
  /**
   * A temporary file to contain rejected entries.
   */
  private File rejectFile;
  /**
   * A makeldif template used to create some test entries.
   */
  private static String diff = "";
  private static String[] template = new String[] {
    "define suffix=" + baseDnStr,
    "define maildomain=example.com",
    "define numusers=11",
    "",
    "branch: [suffix]",
    "",
    "branch: ou=People,[suffix]",
    "subordinateTemplate: person:[numusers]",
    "",
    "template: person",
    "rdnAttr: uid",
    "objectClass: top",
    "objectClass: person",
    "objectClass: organizationalPerson",
    "objectClass: inetOrgPerson",
    "givenName: <first>",
    "sn: <last>",
    "cn: {givenName} {sn}",
    "initials: {givenName:1}<random:chars:" +
    "ABCDEFGHIJKLMNOPQRSTUVWXYZ:1>{sn:1}",
    "employeeNumber: <sequential:0>",
    "uid: user.{employeeNumber}",
    "mail: {uid}@[maildomain]",
    "userPassword: password",
    "telephoneNumber: <random:telephone>",
    "homePhone: <random:telephone>",
    "pager: <random:telephone>",
    "mobile: <random:telephone>",
    "street: <random:numeric:5> <file:streets> Street",
    "l: <file:cities>",
    "st: <file:states>",
    "postalCode: <random:numeric:5>",
    "postalAddress: {cn}${street}${l}, {st}  {postalCode}",
    "description: This is the description for {cn} " + diff,
  ""};
  private void debugInfo(String s)
  {
    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
    if (debugEnabled())
    {
      TRACER.debugInfo("** TEST **" + s);
    }
  }
  protected void debugInfo(String message, Exception e)
  {
    debugInfo(message + stackTraceToSingleLineString(e));
  }
  /**
   * Set up the environment for performing the tests in this Class.
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @BeforeClass
  public void setUp() throws Exception
  {
    //log("Starting generationIdTest setup: debugEnabled:" + debugEnabled());
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    baseDn = DN.decode(baseDnStr);
    updatedEntries = newLDIFEntries();
    // Create an internal connection in order to provide operations
    // to DS to populate the db -
    connection = InternalClientConnection.getRootConnection();
    // Synchro provider
    String synchroStringDN = "cn=Synchronization Providers,cn=config";
    // Synchro multi-master
    synchroPluginStringDN = "cn=Multimaster Synchronization, "
      + synchroStringDN;
    // Synchro suffix
    synchroServerEntry = null;
    // Add config entries to the current DS server based on :
    // Add the replication plugin: synchroPluginEntry & synchroPluginStringDN
    // Add synchroServerEntry
    // Add replServerEntry
    configureReplication();
    taskInitRemoteS2 = TestCaseUtils.makeEntry(
        "dn: ds-task-id=" + UUID.randomUUID() +
        ",cn=Scheduled Tasks,cn=Tasks",
        "objectclass: top",
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-remote-replica",
        "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
        "ds-task-initialize-domain-dn: " + baseDn,
        "ds-task-initialize-replica-server-id: " + server2ID);
    // Change log
    String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
    String changeLogLdif = "dn: " + changeLogStringDN + "\n"
    + "objectClass: top\n"
    + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
    + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n"
    + "ds-cfg-changelog-server-id: 1\n"
    + "ds-cfg-window-size: " + WINDOW_SIZE + "\n"
    + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE;
    replServerEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
    replServerEntry = null;
  }
  // Tests that entries have been written in the db
  private int testEntriesInDb()
  {
    debugInfo("TestEntriesInDb");
    short found = 0;
    for (String entry : updatedEntries)
    {
      int dns = entry.indexOf("dn: ");
      int dne = entry.indexOf("dc=com");
      String dn = entry.substring(dns+4,dne+6);
      debugInfo("Search Entry: " + dn);
      DN entryDN = null;
      try
      {
        entryDN = DN.decode(dn);
      }
      catch(Exception e)
      {
        debugInfo("TestEntriesInDb/" + e);
      }
      try
      {
        Entry resultEntry = getEntry(entryDN, 1000, true);
        if (resultEntry==null)
        {
          debugInfo("Entry not found <" + dn + ">");
        }
        else
        {
          debugInfo("Entry found <" + dn + ">");
          found++;
        }
      }
      catch(Exception e)
      {
        debugInfo("TestEntriesInDb/", e);
      }
    }
    return found;
  }
  /**
   * Add a task to the configuration of the current running DS.
   * @param taskEntry The task to add.
   * @param expectedResult The expected result code for the ADD.
   * @param errorMessageID The expected error messageID when the expected
   * result code is not SUCCESS
   */
  private void addTask(Entry taskEntry, ResultCode expectedResult,
      Message errorMessage)
  {
    try
    {
      debugInfo("AddTask/" + taskEntry);
      // Change config of DS to launch the total update task
      InternalClientConnection connection =
        InternalClientConnection.getRootConnection();
      // Add the task.
      AddOperation addOperation =
        connection.processAdd(taskEntry.getDN(),
            taskEntry.getObjectClasses(),
            taskEntry.getUserAttributes(),
            taskEntry.getOperationalAttributes());
      assertEquals(addOperation.getResultCode(), expectedResult,
          "Result of ADD operation of the task is: "
          + addOperation.getResultCode()
          + " Expected:"
          + expectedResult + " Details:" + addOperation.getErrorMessage()
          + addOperation.getAdditionalLogMessage());
      if (expectedResult != ResultCode.SUCCESS)
      {
        assertTrue(addOperation.getErrorMessage().toString().
            startsWith(errorMessage.toString()),
            "Error MsgID of the task <"
            + addOperation.getErrorMessage()
            + "> equals <"
            + errorMessage + ">");
        debugInfo("Create config task: <"+ errorMessage.getDescriptor().getId()
                + addOperation.getErrorMessage() + ">");
      }
      else
      {
        waitTaskState(taskEntry, TaskState.RUNNING, null);
      }
      // Entry will be removed at the end of the test
      entryList.addLast(taskEntry.getDN());
      debugInfo("AddedTask/" + taskEntry.getDN());
    }
    catch(Exception e)
    {
      fail("Exception when adding task:"+ e.getMessage());
    }
  }
  private void waitTaskState(Entry taskEntry, TaskState expectedTaskState,
      Message expectedMessage)
  {
    TaskState taskState = null;
    try
    {
      SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
      Entry resultEntry = null;
      do
      {
        InternalSearchOperation searchOperation =
          connection.processSearch(taskEntry.getDN(),
              SearchScope.BASE_OBJECT,
              filter);
        try
        {
          resultEntry = searchOperation.getSearchEntries().getFirst();
        } catch (Exception e)
        {
          fail("Task entry was not returned from the search.");
          continue;
        }
        try
        {
          // Check that the task state is as expected.
          AttributeType taskStateType =
            DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
          String stateString =
            resultEntry.getAttributeValue(taskStateType,
                DirectoryStringSyntax.DECODER);
          taskState = TaskState.fromString(stateString);
        }
        catch(Exception e)
        {
          fail("Exception"+ e.getMessage()+e.getStackTrace());
        }
        Thread.sleep(500);
      }
      while ((taskState != expectedTaskState) &&
          (taskState != TaskState.STOPPED_BY_ERROR));
      // Check that the task contains some log messages.
      AttributeType logMessagesType = DirectoryServer.getAttributeType(
          ATTR_TASK_LOG_MESSAGES.toLowerCase());
      ArrayList<String> logMessages = new ArrayList<String>();
      resultEntry.getAttributeValues(logMessagesType,
          DirectoryStringSyntax.DECODER,
          logMessages);
      if ((taskState != TaskState.COMPLETED_SUCCESSFULLY)
          && (taskState != TaskState.RUNNING))
      {
        if (logMessages.size() == 0)
        {
          fail("No log messages were written to the task entry on a failed task");
        }
        else
        {
          if (expectedMessage != null)
          {
            debugInfo(logMessages.get(0));
            debugInfo(expectedMessage.toString());
            assertTrue(logMessages.get(0).indexOf(
                expectedMessage.toString())>0);
          }
        }
      }
      assertEquals(taskState, expectedTaskState, "Task State:" + taskState +
          " Expected task state:" + expectedTaskState);
    }
    catch(Exception e)
    {
      fail("waitTaskState Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /**
   * Add to the current DB the entries necessary to the test
   */
  private void addTestEntriesToDB(String[] ldifEntries)
  {
    try
    {
      // Change config of DS to launch the total update task
      InternalClientConnection connection =
        InternalClientConnection.getRootConnection();
      for (String ldifEntry : ldifEntries)
      {
        Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
        AddOperationBasis addOp = new AddOperationBasis(
            connection,
            InternalClientConnection.nextOperationID(),
            InternalClientConnection.nextMessageID(),
            null,
            entry.getDN(),
            entry.getObjectClasses(),
            entry.getUserAttributes(),
            entry.getOperationalAttributes());
        addOp.setInternalOperation(true);
        addOp.run();
        if (addOp.getResultCode() != ResultCode.SUCCESS)
        {
          debugInfo("addEntry: Failed" + addOp.getResultCode());
        }
        // They will be removed at the end of the test
        entryList.addLast(entry.getDN());
      }
    }
    catch(Exception e)
    {
      fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /*
   * Creates entries necessary to the test.
   */
  private String[] newLDIFEntries()
  {
    String[] entries =
    {
        "dn: " + baseDn + "\n"
        + "objectClass: top\n"
        + "objectClass: domain\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111111\n"
        + "\n",
        "dn: ou=People," + baseDn + "\n"
        + "objectClass: top\n"
        + "objectClass: organizationalUnit\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
        + "\n",
        "dn: cn=Fiona Jensen,ou=people," + baseDn + "\n"
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
        + "objectclass: inetOrgPerson\n"
        + "cn: Fiona Jensen\n"
        + "sn: Jensen\n"
        + "uid: fiona\n"
        + "telephonenumber: +1 408 555 1212\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111113\n"
        + "\n",
        "dn: cn=Robert Langman,ou=people," + baseDn + "\n"
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
        + "objectclass: inetOrgPerson\n"
        + "cn: Robert Langman\n"
        + "sn: Langman\n"
        + "uid: robert\n"
        + "telephonenumber: +1 408 555 1213\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111114\n"
        + "\n"
    };
    return entries;
  }
  private int receiveImport(ReplicationBroker broker, short serverID,
      String[] updatedEntries)
  {
    // Expect the broker to receive the entries
    ReplicationMessage msg;
    short entriesReceived = 0;
    while (true)
    {
      try
      {
        debugInfo("Broker " + serverID + " Wait for entry or done msg");
        msg = broker.receive();
        if (msg == null)
          break;
        if (msg instanceof InitializeTargetMessage)
        {
          debugInfo("Broker " + serverID + " receives InitializeTargetMessage ");
          entriesReceived = 0;
        }
        else if (msg instanceof EntryMessage)
        {
          EntryMessage em = (EntryMessage)msg;
          debugInfo("Broker " + serverID + " receives entry " + new String(em.getEntryBytes()));
          entriesReceived++;
        }
        else if (msg instanceof DoneMessage)
        {
          debugInfo("Broker " + serverID + "  receives done ");
          break;
        }
        else if (msg instanceof ErrorMessage)
        {
          ErrorMessage em = (ErrorMessage)msg;
          debugInfo("Broker " + serverID + "  receives ERROR "
              + em.toString());
          break;
        }
        else
        {
          debugInfo("Broker " + serverID + " receives and trashes " + msg);
        }
      }
      catch(Exception e)
      {
        debugInfo("receiveUpdatedEntries" + stackTraceToSingleLineString(e));
      }
    }
    if (updatedEntries != null)
    {
      assertTrue(entriesReceived == updatedEntries.length,
          " Received entries("+entriesReceived +
          ") == Expected entries("+updatedEntries.length+")");
    }
    return entriesReceived;
  }
  /**
   * Creates a new replicationServer.
   * @param changelogId The serverID of the replicationServer to create.
   * @param all         Specifies whether to coonect the created replication
   *                    server to the other replication servers in the test.
   * @return The new created replication server.
   */
  private ReplicationServer createReplicationServer(short changelogId,
      boolean all, String suffix)
  {
    SortedSet<String> servers = null;
    servers = new TreeSet<String>();
    try
    {
      if (changelogId==changelog1ID)
      {
        if (replServer1!=null)
          return replServer1;
      }
      else if (changelogId==changelog2ID)
      {
        if (replServer2!=null)
          return replServer2;
      }
      else if (changelogId==changelog3ID)
      {
        if (replServer3!=null)
          return replServer3;
      }
      if (all)
      {
        servers.add("localhost:" + getChangelogPort(changelog1ID));
        servers.add("localhost:" + getChangelogPort(changelog2ID));
        servers.add("localhost:" + getChangelogPort(changelog3ID));
      }
      int chPort = getChangelogPort(changelogId);
      String chDir = "genid"+changelogId+suffix+"Db";
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(chPort, chDir, 0, changelogId, 0, 100,
            servers);
      ReplicationServer replicationServer = new ReplicationServer(conf);
      Thread.sleep(1000);
      return replicationServer;
    }
    catch (Exception e)
    {
      fail("createChangelog" + stackTraceToSingleLineString(e));
    }
    return null;
  }
  /**
   * Create a synchronized suffix in the current server providing the
   * replication Server ID.
   * @param changelogID
   */
  private void connectToReplServer(short changelogID)
  {
    // Connect DS to the replicationServer
    try
    {
      // suffix synchronized
      String synchroServerStringDN = synchroPluginStringDN;
      String synchroServerLdif =
        "dn: cn=" + baseSnStr + ", cn=domains," + synchroServerStringDN + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-synchronization-provider-config\n"
        + "cn: " + baseSnStr + "\n"
        + "ds-cfg-synchronization-dn: " + baseDnStr + "\n"
        + "ds-cfg-changelog-server: localhost:"
        + getChangelogPort(changelogID)+"\n"
        + "ds-cfg-directory-server-id: " + server1ID + "\n"
        + "ds-cfg-receive-status: true\n"
        + "ds-cfg-window-size: " + WINDOW_SIZE;
      if (synchroServerEntry == null)
      {
        synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
        DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
        assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the synchronized server");
        configEntryList.add(synchroServerEntry.getDN());
        replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
      }
      if (replDomain != null)
      {
        debugInfo("ReplicationDomain: Import/Export is running ? " + replDomain.ieRunning());
      }
    }
    catch(Exception e)
    {
      debugInfo("connectToReplServer", e);
      fail("connectToReplServer", e);
    }
  }
  /*
   * Disconnect DS from the replicationServer
   */
  private void disconnectFromReplServer(short changelogID)
  {
    try
    {
      // suffix synchronized
      String synchroServerStringDN = "cn=" + baseSnStr + ", cn=domains," +
      synchroPluginStringDN;
      if (synchroServerEntry != null)
      {
        DN synchroServerDN = DN.decode(synchroServerStringDN);
        DirectoryServer.getConfigHandler().deleteEntry(synchroServerDN,null);
        assertTrue(DirectoryServer.getConfigEntry(synchroServerEntry.getDN())==null,
        "Unable to delete the synchronized domain");
        synchroServerEntry = null;
        configEntryList.remove(configEntryList.indexOf(synchroServerDN));
      }
    }
    catch(Exception e)
    {
      fail("disconnectFromReplServer", e);
    }
  }
  private int getChangelogPort(short changelogID)
  {
    if (replServerPort[changelogID] == 0)
    {
      try
      {
        // Find  a free port for the replicationServer
        ServerSocket socket = TestCaseUtils.bindFreePort();
        replServerPort[changelogID] = socket.getLocalPort();
        socket.close();
      }
      catch(Exception e)
      {
        fail("Cannot retrieve a free port for replication server."
            + e.getMessage());
      }
    }
    return replServerPort[changelogID];
  }
  protected static final String REPLICATION_GENERATION_ID =
    "ds-sync-generation-id";
  private long readGenId()
  {
    long genId=-1;
    try
    {
      Entry resultEntry = getEntry(baseDn, 1000, true);
      if (resultEntry==null)
      {
        debugInfo("Entry not found <" + baseDn + ">");
      }
      else
      {
        debugInfo("Entry found <" + baseDn + ">");
        AttributeType synchronizationGenIDType =
          DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID);
        List<Attribute> attrs =
          resultEntry.getAttribute(synchronizationGenIDType);
        if (attrs != null)
        {
          Attribute attr = attrs.get(0);
          LinkedHashSet<AttributeValue> values = attr.getValues();
          if (values.size() == 1)
          {
            genId = Long.decode(values.iterator().next().getStringValue());
          }
        }
      }
    }
    catch(Exception e)
    {
      fail("Exception raised in readGenId", e);
    }
    return genId;
  }
  private Entry getTaskImport()
  {
    Entry task = null;
    try
    {
      // Create a temporary test LDIF file.
      ldifFile = File.createTempFile("import-test", ".ldif");
      String resourcePath = DirectoryServer.getServerRoot() + File.separator +
      "config" + File.separator + "MakeLDIF";
      LdifFileWriter.makeLdif(ldifFile.getPath(), resourcePath, template);
      // Create a temporary rejects file.
      rejectFile = File.createTempFile("import-test-rejects", ".ldif");
      task = TestCaseUtils.makeEntry(
          "dn: ds-task-id=" + UUID.randomUUID() +
          ",cn=Scheduled Tasks,cn=Tasks",
          "objectclass: top",
          "objectclass: ds-task",
          "objectclass: ds-task-import",
          "ds-task-class-name: org.opends.server.tasks.ImportTask",
          "ds-task-import-backend-id: userRoot",
          "ds-task-import-ldif-file: " + ldifFile.getPath(),
          "ds-task-import-reject-file: " + rejectFile.getPath(),
          "ds-task-import-overwrite-rejects: TRUE",
          "ds-task-import-exclude-attribute: description"
      );
    }
    catch(Exception e)
    {
    }
    return task;
  }
  private String createEntry(UUID uid)
  {
    String user2dn = "uid=user"+uid+",ou=People," + baseDnStr;
    return new String(
        "dn: "+ user2dn + "\n"
        + "objectClass: top\n" + "objectClass: person\n"
        + "objectClass: organizationalPerson\n"
        + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
        + "homePhone: 951-245-7634\n"
        + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
        + "mobile: 027-085-0537\n"
        + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
        + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
        + "cn: Aaccf Amar2\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
        + "street: 17984 Thirteenth Street\n"
        + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 2\n"
        + "sn: Amar2\n" + "givenName: Aaccf2\n" + "postalCode: 85762\n"
        + "userPassword: password\n" + "initials: AA\n");
  }
  static protected ReplicationMessage createAddMsg()
  {
    Entry personWithUUIDEntry = null;
    String user1entryUUID;
    String baseUUID = null;
    String user1dn;
    /*
     * Create a Change number generator to generate new changenumbers
     * when we need to send operation messages to the replicationServer.
     */
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 2, 0);
    user1entryUUID = "33333333-3333-3333-3333-333333333333";
    user1dn = "uid=user1,ou=People," + baseDnStr;
    String entryWithUUIDldif = "dn: "+ user1dn + "\n"
    + "objectClass: top\n" + "objectClass: person\n"
    + "objectClass: organizationalPerson\n"
    + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
    + "homePhone: 951-245-7634\n"
    + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
    + "mobile: 027-085-0537\n"
    + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
    + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
    + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
    + "street: 17984 Thirteenth Street\n"
    + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
    + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
    + "userPassword: password\n" + "initials: AA\n"
    + "entryUUID: " + user1entryUUID + "\n";
    try
    {
      personWithUUIDEntry = TestCaseUtils.entryFromLdifString(entryWithUUIDldif);
    }
    catch(Exception e)
    {
      fail(e.getMessage());
    }
    // Create and publish an update message to add an entry.
    AddMsg addMsg = new AddMsg(gen.newChangeNumber(),
        personWithUUIDEntry.getDN().toString(),
        user1entryUUID,
        baseUUID,
        personWithUUIDEntry.getObjectClassAttribute(),
        personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
    return addMsg;
  }
  /**
   * SingleRS tests basic features of generationID
   * with one single Replication Server.
   *
   * @throws Exception
   */
  @Test(enabled=true)
  public void testSingleRS() throws Exception
  {
    String testCase = "testSingleRS";
    debugInfo("Starting "+ testCase + " debugEnabled:" + debugEnabled());
    debugInfo(testCase + " Clearing DS1 backend");
    ReplicationDomain.clearJEBackend(false,
        "userRoot",
        baseDn.toNormalizedString());
    try
    {
      long rgenId;
      long genId;
      replServer1 = createReplicationServer(changelog1ID, false, testCase);
      assertEquals(replServer1.getGenerationId(baseDn), -1);
      /*
       * Test  : empty replicated backend
       * Check : nothing is broken - no generationId generated
       */
      // Connect DS to RS with no data
      // Read generationId - should be not retrievable since no entry
      debugInfo(testCase + " Connecting DS1 to replServer1(" + changelog1ID + ")");
      connectToReplServer(changelog1ID);
      Thread.sleep(1000);
      debugInfo(testCase + " Expect genId attribute to be not retrievable");
      genId = readGenId();
      assertEquals(genId,-1);
      debugInfo(testCase + " Expect genId to be set in memory on the replication " +
      " server side even if not wrote on disk/db since no change occured.");
      rgenId = replServer1.getGenerationId(baseDn);
      assertEquals(rgenId, 3211313L);
      debugInfo(testCase + " Disconnecting DS1 from replServer1(" + changelog1ID + ")");
      disconnectFromReplServer(changelog1ID);
      /*
       * Test  : non empty replicated backend
       * Check : generationId correctly generated
       */
      // Now disconnect - create entries and reconnect
      // Test that generation has been added to the data.
      debugInfo(testCase + " add test entries to DS");
      this.addTestEntriesToDB(updatedEntries);
      connectToReplServer(changelog1ID);
      // Test that the generationId is written in the DB in the
      // root entry on the replica side
      genId = readGenId();
      assertTrue(genId != -1);
      assertTrue(genId != 3211313L);
      // Test that the generationId is set on the replication server side
      rgenId = replServer1.getGenerationId(baseDn);
      assertEquals(genId, rgenId);
      /*
       * Test : Connection from 2nd broker with a different generationId
       * Check: We should receive an error message
       */
      try
      {
        broker2 = openReplicationSession(baseDn,
            server2ID, 100, getChangelogPort(changelog1ID),
            1000, !emptyOldChanges, genId+1);
      }
      catch(SocketException se)
      {
        fail("Broker connection is expected to be accepted.");
      }
      try
      {
        ReplicationMessage msg = broker2.receive();
        if (!(msg instanceof ErrorMessage))
        {
          fail("Broker connection is expected to receive an ErrorMessage."
              + msg);
        }
        ErrorMessage emsg = (ErrorMessage)msg;
        debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails());
      }
      catch(SocketTimeoutException se)
      {
        fail("Broker is expected to receive an ErrorMessage.");
      }
      /*
       * Test  : Connect with same generationId
       * Check : Must be accepted.
       */
      try
      {
        broker3 = openReplicationSession(baseDn,
            server3ID, 100, getChangelogPort(changelog1ID), 1000, !emptyOldChanges, genId);
      }
      catch(SocketException se)
      {
        fail("Broker connection is expected to be accepted.");
      }
      /*
       * Test  : generationID persistence in Replication server
       *         Shutdown/Restart Replication Server and redo connections
       *         with valid and invalid generationId
       * Check : same expected connections results
       */
      // The changes from broker2 should be ignored
      broker2.publish(createAddMsg());
      try
      {
        broker3.receive();
        fail("No update message is supposed to be received here.");
      }
      catch(SocketTimeoutException e)
      {
        // This is the expected result
      }
      // Now create a change that must be replicated
      String ent1[] = { createEntry(UUID.randomUUID()) };
      this.addTestEntriesToDB(ent1);
      try
      {
        ReplicationMessage msg = broker3.receive();
        debugInfo("Broker 3 received expected update msg" + msg);
      }
      catch(SocketTimeoutException e)
      {
        fail("Update message is supposed to be received.");
      }
      long genIdBeforeShut = replServer1.getGenerationId(baseDn);
      debugInfo("Shutdown replServer1");
      broker2.stop();
      broker2 = null;
      broker3.stop();
      broker3 = null;
      replServer1.shutdown();
      replServer1 = null;
      debugInfo("Create again replServer1");
      replServer1 = createReplicationServer(changelog1ID, false, testCase);
      debugInfo("Delay to allow DS to reconnect to replServer1");
      Thread.sleep(200);
      long genIdAfterRestart = replServer1.getGenerationId(baseDn);
      debugInfo("Aft restart / replServer.genId=" + genIdAfterRestart);
      assertTrue(replServer1!=null, "Replication server creation failed.");
      assertTrue(genIdBeforeShut == genIdAfterRestart,
      "generationId is expected to have the same value after replServer1 restart");
      try
      {
        debugInfo("Create again broker2");
        broker2 = openReplicationSession(baseDn,
            server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges, genId);
        assertTrue(broker2.isConnected(), "Broker2 failed to connect to replication server");
        debugInfo("Create again broker3");
        broker3 = openReplicationSession(baseDn,
            server3ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges, genId);
        assertTrue(broker3.isConnected(), "Broker3 failed to connect to replication server");
      }
      catch(SocketException se)
      {
        fail("Broker connection is expected to be accepted.");
      }
      /*
       *
       * FIXME Should clearJEBackend() regenerate generationId and do a start
       *       against ReplicationServer ?
       */
      /*
       * Test: Reset the replication server in order to allow new data set.
       */
      Entry taskReset = TestCaseUtils.makeEntry(
          "dn: ds-task-id=resetgenid"+genId+ UUID.randomUUID() +
          ",cn=Scheduled Tasks,cn=Tasks",
          "objectclass: top",
          "objectclass: ds-task",
          "objectclass: ds-task-reset-generation-id",
          "ds-task-class-name: org.opends.server.tasks.SetGenerationIdTask",
          "ds-task-reset-generation-id-domain-base-dn: " + baseDnStr);
      debugInfo("Reset generationId");
      addTask(taskReset, ResultCode.SUCCESS, null);
      waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null);
      Thread.sleep(200);
      // TODO: Test that replication server db has been cleared
      assertEquals(replServer1.getGenerationId(baseDn),
          -1, "Expected genId to be reset in replServer1");
      ReplicationMessage rcvmsg = broker2.receive();
      if (!(rcvmsg instanceof ErrorMessage))
      {
        fail("Broker2 is expected to receive an ErrorMessage " +
            " to signal degradation due to reset" + rcvmsg);
      }
      ErrorMessage emsg = (ErrorMessage)rcvmsg;
      debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails());
      rcvmsg = broker3.receive();
      if (!(rcvmsg instanceof ErrorMessage))
      {
        fail("Broker3 is expected to receive an ErrorMessage " +
            " to signal degradation due to reset" + rcvmsg);
      }
      emsg = (ErrorMessage)rcvmsg;
      debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails());
      rgenId = replServer1.getGenerationId(baseDn);
      assertTrue(rgenId==-1,"Expecting that genId has been reset in replServer1: rgenId="+rgenId);
      assertTrue(replServer1.getReplicationCache(baseDn, false).
          isDegradedDueToGenerationId(server1ID),
      "Expecting that DS is degraded since domain genId has been reset");
      assertTrue(replServer1.getReplicationCache(baseDn, false).
          isDegradedDueToGenerationId(server2ID),
      "Expecting that broker2 is degraded since domain genId has been reset");
      assertTrue(replServer1.getReplicationCache(baseDn, false).
          isDegradedDueToGenerationId(server3ID),
      "Expecting that broker3 is degraded since domain genId has been reset");
      // Now create a change that normally would be replicated
      // but will not be replicated here since DS and brokers are degraded
      String[] ent2 = { createEntry(UUID.randomUUID()) };
      this.addTestEntriesToDB(ent2);
      try
      {
        ReplicationMessage msg = broker2.receive();
        fail("No update message is supposed to be received by degraded broker2" + msg);
      } catch(SocketTimeoutException e) { /* expected */ }
      try
      {
        ReplicationMessage msg = broker3.receive();
        fail("No update message is supposed to be received by degraded broker3"+ msg);
      } catch(SocketTimeoutException e) { /* expected */ }
      debugInfo("broker2 is publishing a change, " +
      "replServer1 expected to ignore this change.");
      broker2.publish(createAddMsg());
      try
      {
        ReplicationMessage msg = broker3.receive();
        fail("No update message is supposed to be received by degraded broker3"+ msg);
      } catch(SocketTimeoutException e) { /* expected */ }
      debugInfo("Launch an on-line import on DS.");
      genId=-1;
      Entry importTask = getTaskImport();
      addTask(importTask, ResultCode.SUCCESS, null);
      waitTaskState(importTask, TaskState.COMPLETED_SUCCESSFULLY, null);
      Thread.sleep(500);
      debugInfo("Expect new genId to be computed on DS and sent to all replServers after on-line import.");
      genId = readGenId();
      assertTrue(genId != -1, "DS is expected to have a new genID computed " +
          " after on-line import but genId=" + genId);
      rgenId = replServer1.getGenerationId(baseDn);
      assertEquals(genId, rgenId, "DS and replServer are expected to have same genId.");
      // In S1 launch the total update to initialize S2
      addTask(taskInitRemoteS2, ResultCode.SUCCESS, null);
      // S2 should be re-initialized and have a new valid genId
      int receivedEntriesNb = this.receiveImport(broker2, server2ID, null);
      debugInfo("broker2 has been initialized from DS with #entries=" + receivedEntriesNb);
      debugInfo("Adding reset task to DS.");
      taskReset = TestCaseUtils.makeEntry(
          "dn: ds-task-id=resetgenid"+ UUID.randomUUID() +
          ",cn=Scheduled Tasks,cn=Tasks",
          "objectclass: top",
          "objectclass: ds-task",
          "objectclass: ds-task-reset-generation-id",
          "ds-task-class-name: org.opends.server.tasks.SetGenerationIdTask",
          "ds-task-reset-generation-id-domain-base-dn: " + baseDnStr);
      addTask(taskReset, ResultCode.SUCCESS, null);
      waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null);
      Thread.sleep(200);
      debugInfo("Verifying that replServer1 has been reset.");
      assertEquals(replServer1.getGenerationId(baseDn), -1);
      debugInfo("Disconnect DS from replServer1 (required in order to DEL entries).");
      disconnectFromReplServer(changelog1ID);
      postTest();
      debugInfo(testCase + " Clearing DS backend");
      ReplicationDomain.clearJEBackend(false,
          replDomain.getBackend().getBackendID(),
          baseDn.toNormalizedString());
      // At this moment, root entry of the domain has been removed so
      // genId is no more in the database ... but it has still the old
      // value in memory.
      int found = testEntriesInDb();
      replDomain.loadGenerationId();
      debugInfo("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      fail(testCase + " Exception:"+ e.getMessage() + " " +
          stackTraceToSingleLineString(e));
    }
  }
  /**
  /**
   * SingleRS tests basic features of generationID
   * with more than one Replication Server.
   * The following test focus on:
   * - genId checking accross multiple starting RS (replication servers)
   * - genId setting propagation from one RS to the others
   * - genId reset   propagation from one RS to the others
   */
  @Test(enabled=true)
  public void testMultiRS() throws Exception
  {
    String testCase = "testMultiRS";
    long genId;
    debugInfo("Starting " + testCase);
    ReplicationDomain.clearJEBackend(false,
        "userRoot",
        baseDn.toNormalizedString());
    debugInfo ("Creating 3 RS");
    replServer1 = createReplicationServer(changelog1ID, true, testCase);
    replServer2 = createReplicationServer(changelog2ID, true, testCase);
    replServer3 = createReplicationServer(changelog3ID, true, testCase);
    Thread.sleep(500);
    debugInfo("Connecting DS to replServer1");
    connectToReplServer(changelog1ID);
    Thread.sleep(1500);
    debugInfo("Expect genId are set in all replServers.");
    assertEquals(replServer1.getGenerationId(baseDn), 3211313L, " in replServer1");
    assertEquals(replServer2.getGenerationId(baseDn), 3211313L, " in replServer2");
    assertEquals(replServer3.getGenerationId(baseDn), 3211313L, " in replServer3");
    debugInfo("Disconnect DS from replServer1.");
    disconnectFromReplServer(changelog1ID);
    Thread.sleep(1000);
    debugInfo("Expect genId to be unset(-1) in all servers since no server is " +
        " connected and no change ever occured");
    assertEquals(replServer1.getGenerationId(baseDn), -1, " in replServer1");
    assertEquals(replServer2.getGenerationId(baseDn), -1, " in replServer2");
    assertEquals(replServer3.getGenerationId(baseDn), -1, " in replServer3");
    debugInfo("Add entries to DS");
    this.addTestEntriesToDB(updatedEntries);
    debugInfo("Connecting DS to replServer2");
    connectToReplServer(changelog2ID);
    Thread.sleep(1000);
    debugInfo("Expect genIds to be set in all servers based on the added entries.");
    genId = readGenId();
    assertTrue(genId != -1);
    assertEquals(replServer1.getGenerationId(baseDn), genId);
    assertEquals(replServer2.getGenerationId(baseDn), genId);
    assertEquals(replServer3.getGenerationId(baseDn), genId);
    debugInfo("Connecting broker2 to replServer3 with a good genId");
    try
    {
      broker2 = openReplicationSession(baseDn,
          server2ID, 100, getChangelogPort(changelog3ID),
          1000, !emptyOldChanges, genId);
      Thread.sleep(1000);
    }
    catch(SocketException se)
    {
      fail("Broker connection is expected to be accepted.");
    }
    debugInfo("Expecting that broker2 is not degraded since it has a correct genId");
    assertTrue(!replServer1.getReplicationCache(baseDn, false).
        isDegradedDueToGenerationId(server2ID));
    debugInfo("Disconnecting DS from replServer1");
    disconnectFromReplServer(changelog1ID);
    debugInfo("Expect all genIds to keep their value since broker2 is still connected.");
    assertEquals(replServer1.getGenerationId(baseDn), genId);
    assertEquals(replServer2.getGenerationId(baseDn), genId);
    assertEquals(replServer3.getGenerationId(baseDn), genId);
    debugInfo("Connecting broker2 to replServer1 with a bad genId");
    try
    {
      long badgenId=1;
      broker3 = openReplicationSession(baseDn,
          server3ID, 100, getChangelogPort(changelog1ID),
          1000, !emptyOldChanges, badgenId);
      Thread.sleep(1000);
    }
    catch(SocketException se)
    {
      fail("Broker connection is expected to be accepted.");
    }
    debugInfo("Expecting that broker3 is degraded since it has a bad genId");
    assertTrue(replServer1.getReplicationCache(baseDn, false).
        isDegradedDueToGenerationId(server3ID));
    int found = testEntriesInDb();
    assertEquals(found, updatedEntries.length,
        " Entries present in DB :" + found +
        " Expected entries :" + updatedEntries.length);
    debugInfo("Connecting DS to replServer1.");
    connectToReplServer(changelog1ID);
    Thread.sleep(1000);
    debugInfo("Adding reset task to DS.");
    Entry taskReset = TestCaseUtils.makeEntry(
        "dn: ds-task-id=resetgenid"+ UUID.randomUUID() +
        ",cn=Scheduled Tasks,cn=Tasks",
        "objectclass: top",
        "objectclass: ds-task",
        "objectclass: ds-task-reset-generation-id",
        "ds-task-class-name: org.opends.server.tasks.SetGenerationIdTask",
        "ds-task-reset-generation-id-domain-base-dn: " + baseDnStr);
    addTask(taskReset, ResultCode.SUCCESS, null);
    waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null);
    Thread.sleep(500);
    debugInfo("Verifying that all replservers genIds have been reset.");
    genId = readGenId();
    assertEquals(replServer1.getGenerationId(baseDn), -1);
    assertEquals(replServer2.getGenerationId(baseDn), -1);
    assertEquals(replServer3.getGenerationId(baseDn), -1);
    debugInfo("Disconnect DS from replServer1 (required in order to DEL entries).");
    disconnectFromReplServer(changelog1ID);
    debugInfo("Cleaning entries");
    postTest();
    debugInfo("Successfully ending " + testCase);
  }
  /**
   * Disconnect broker and remove entries from the local DB
   * @throws Exception
   */
  protected void postTest()
  {
    debugInfo("Post test cleaning.");
    // Clean brokers
    if (broker2 != null)
      broker2.stop();
    broker2 = null;
    if (broker3 != null)
      broker3.stop();
    broker3 = null;
    if (replServer1 != null)
      replServer1.shutdown();
    if (replServer2 != null)
      replServer2.shutdown();
    if (replServer2 != null)
      replServer2.shutdown();
    replServer1 = null;
    replServer2 = null;
    replServer3 = null;
    super.cleanRealEntries();
    try
    {
      ReplicationDomain.clearJEBackend(false,
        replDomain.getBackend().getBackendID(),
        baseDn.toNormalizedString());
      // At this moment, root entry of the domain has been removed so
      // genId is no more in the database ... but it has still the old
      // value in memory.
      testEntriesInDb();
      replDomain.loadGenerationId();
    }
    catch (Exception e) {}
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -48,6 +48,7 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.net.SocketTimeoutException;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.TaskState;
@@ -124,14 +125,14 @@
  boolean ssShutdownRequested = false;
  protected String[] updatedEntries;
  boolean externalDS = false;
  private static final short server1ID = 11;
  private static final short server2ID = 21;
  private static final short server3ID = 31;
  private static final short changelog1ID = 1;
  private static final short changelog2ID = 2;
  private static final short changelog3ID = 3;
  private static final short server1ID = 1;
  private static final short server2ID = 2;
  private static final short server3ID = 3;
  private static final short changelog1ID =  8;
  private static final short changelog2ID =  9;
  private static final short changelog3ID = 10;
  private static int[] replServerPort = new int[4];
  private static int[] replServerPort = new int[20];
  
  private DN baseDn;
  ReplicationBroker server2 = null;
@@ -140,7 +141,7 @@
  ReplicationServer changelog2 = null;
  ReplicationServer changelog3 = null;
  boolean emptyOldChanges = true;
  ReplicationDomain sd = null;
  ReplicationDomain replDomain = null;
  private void log(String s)
  {
@@ -169,7 +170,6 @@
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    baseDn = DN.decode("dc=example,dc=com");
    updatedEntries = newLDIFEntries();
@@ -756,9 +756,14 @@
      servers.add("localhost:" + getChangelogPort(changelog2ID));
      servers.add("localhost:" + getChangelogPort(changelog3ID));
      int chPort = getChangelogPort(changelogId);
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(chPort, null, 0, changelogId, 0, 100,
        new ReplServerFakeConfiguration(
            getChangelogPort(changelogId),
            "rsdbdirname" + getChangelogPort(changelogId),
            0,
            changelogId,
            0,
            100,
            servers);
      ReplicationServer replicationServer = new ReplicationServer(conf);
      Thread.sleep(1000);
@@ -804,17 +809,18 @@
        assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the synchronized server");
        sd = ReplicationDomain.retrievesReplicationDomain(baseDn);
        replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
        // Clear the backend
        ReplicationDomain.clearJEBackend(false,
            sd.getBackend().getBackendID(),
            replDomain.getBackend().getBackendID(),
            baseDn.toNormalizedString());
      }
      if (sd != null)
      if (replDomain != null)
      {
         log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning());
         assertTrue(!replDomain.ieRunning(),
           "ReplicationDomain: Import/Export is not expected to be running");
      }
    }
    catch(Exception e)
@@ -890,7 +896,7 @@
      // Test import result in S1
      testEntriesInDb();
      cleanEntries();
      afterTest();
      log("Successfully ending " + testCase);
    }
@@ -929,7 +935,7 @@
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
    cleanEntries();
    afterTest();
    log("Successfully ending "+testCase);
}
@@ -968,7 +974,7 @@
    // Tests that entries have been received by S2
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
    cleanEntries();
    afterTest();
    log("Successfully ending " + testCase);
@@ -1012,7 +1018,7 @@
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
    receiveUpdatedEntries(server3, server3ID, updatedEntries);
    cleanEntries();
    afterTest();
    log("Successfully ending " + testCase);
@@ -1049,7 +1055,7 @@
      // Test that entries have been imported in S1
      testEntriesInDb();
      cleanEntries();
      afterTest();
      log("Successfully ending " + testCase);
    }
@@ -1103,7 +1109,7 @@
      // Scope containing a serverID absent from the domain
      // createTask(taskInitTargetS2);
      cleanEntries();
      afterTest();
      log("Successfully ending " + testCase);
    }
@@ -1172,7 +1178,7 @@
      // Scope containing a serverID absent from the domain
      // createTask(taskInitTargetS2);
      cleanEntries();
      afterTest();
      log("Successfully ending " + testCase);
    }
@@ -1231,25 +1237,25 @@
    // Check that the list of connected LDAP servers is correct
    // in each replication servers
    List<String> l1 = changelog1.getReplicationCache(baseDn).
    List<String> l1 = changelog1.getReplicationCache(baseDn, false).
      getConnectedLDAPservers();
    assertEquals(l1.size(), 1);
    assertEquals(l1.get(0), String.valueOf(server1ID));
    
    List<String> l2;
    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
    l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
    assertEquals(l2.size(), 2);
    assertEquals(l2.get(0), String.valueOf(server3ID));
    assertEquals(l2.get(1), String.valueOf(server2ID));
        
    List<String> l3;
    l3 = changelog3.getReplicationCache(baseDn).getConnectedLDAPservers();
    l3 = changelog3.getReplicationCache(baseDn, false).getConnectedLDAPservers();
    assertEquals(l3.size(), 0);
    // Test updates
    broker3.stop();
    Thread.sleep(1000);
    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
    l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
    assertEquals(l2.size(), 1);
    assertEquals(l2.get(0), String.valueOf(server2ID));
@@ -1257,7 +1263,7 @@
        server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
    broker2.stop();
    Thread.sleep(1000);
    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
    l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
    assertEquals(l2.size(), 1);
    assertEquals(l2.get(0), String.valueOf(server3ID));
@@ -1266,7 +1272,7 @@
    broker2.stop();
    broker3.stop();
    cleanEntries();
    afterTest();
    changelog3.shutdown();
    changelog3 = null;
@@ -1313,7 +1319,7 @@
    // Tests that entries have been received by S2
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
    cleanEntries();
    afterTest();
    changelog2.shutdown();
    changelog2 = null;
@@ -1335,42 +1341,64 @@
    changelog2 = createChangelogServer(changelog2ID);
    Thread.sleep(1000);
    changelog3 = createChangelogServer(changelog3ID);
    Thread.sleep(1000);
    // Connect DS to the replicationServer 1
    connectServer1ToChangelog(changelog1ID);
    // Put entries in DB
    log(testCase + " Will add entries");
    addTestEntriesToDB();
    // Connect a broker acting as server 2 to Repl Server 2
    if (server2 == null)
    {
      log(testCase + " Will connect server 2 to " + changelog2ID);
      server2 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server2ID, 100, getChangelogPort(changelog2ID),
        1000, emptyOldChanges);
        1000, emptyOldChanges, changelog1.getGenerationId(baseDn));
    }
    // Connect a broker acting as server 3 to Repl Server 3
    log(testCase + " Will create replServer " + changelog3ID);
    changelog3 = createChangelogServer(changelog3ID);
    Thread.sleep(500);
    if (server3 == null)
    {
      log(testCase + " Will connect server 3 to " + changelog3ID);
      server3 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server3ID, 100, getChangelogPort(changelog3ID),
        1000, emptyOldChanges);
        1000, emptyOldChanges, changelog1.getGenerationId(baseDn));
    }
    Thread.sleep(3000);
    Thread.sleep(500);
    // S2 sends init request
    // S3 sends init request
    log(testCase + " server 3 Will send reqinit to " + server1ID);
    InitializeRequestMessage initMsg =
      new InitializeRequestMessage(baseDn, server2ID, server1ID);
    server2.publish(initMsg);
      new InitializeRequestMessage(baseDn, server3ID, server1ID);
    server3.publish(initMsg);
    // S2 should receive target, entries & done
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
    // S3 should receive target, entries & done
    log(testCase + " Will verify server 3 has received expected entries");
    receiveUpdatedEntries(server3, server3ID, updatedEntries);
    cleanEntries();
    while(true)
    {
      try
      {
        ReplicationMessage msg = server3.receive();
        fail("Receive unexpected message " + msg);
      }
      catch(SocketTimeoutException e)
      {
        // Test is a success
        break;
      }
    }
    afterTest();
    changelog3.shutdown();
    changelog3 = null;
    changelog2.shutdown();
    changelog2 = null;
@@ -1419,9 +1447,10 @@
    addTask(taskInit, ResultCode.OTHER, ERR_INVALID_IMPORT_SOURCE.get());
    if (sd != null)
    if (replDomain != null)
    {
       log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning());
       assertTrue(!replDomain.ieRunning(),
         "ReplicationDomain: Import/Export is not expected to be running");
    }
    log("Successfully ending "+testCase);
@@ -1458,9 +1487,10 @@
    waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
        ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
    if (sd != null)
    if (replDomain != null)
    {
       log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning());
       assertTrue(!replDomain.ieRunning(),
         "ReplicationDomain: Import/Export is not expected to be running");
    }
    log("Successfully ending "+testCase);
@@ -1554,7 +1584,7 @@
    waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
        null);
    cleanEntries();
    afterTest();
    log("Successfully ending "+testCase);
@@ -1563,23 +1593,30 @@
  /**
   * Disconnect broker and remove entries from the local DB
   */
  protected void cleanEntries()
  protected void afterTest()
  {
    if (sd != null)
    if (replDomain != null)
    {
       log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning());
       assertTrue(!replDomain.ieRunning(),
         "ReplicationDomain: Import/Export is not expected to be running");
    }
    // Clean brokers
    if (server2 != null)
    {
      server2.stop();
      TestCaseUtils.sleep(100); // give some time to the broker to disconnect
      // from the replicationServer.
      server2 = null;
    }
    if (server3 != null)
    {
      server3.stop();
      TestCaseUtils.sleep(100); // give some time to the broker to disconnect
      // from the replicationServer.
      server3 = null;
    }
    super.cleanRealEntries();
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -49,6 +49,7 @@
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.ReplicationDomain;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMessage;
@@ -349,9 +350,10 @@
    ProtocolVersion.setCurrentVersion((short)2);
    ReplicationBroker broker = new ReplicationBroker(
        new ServerState(),
        baseDn,
        new ServerState(),
        baseDn,
        (short) 13, 0, 0, 0, 0, 1000, 0,
        ReplicationTestCase.getGenerationId(baseDn),
        getReplSessionSecurity());
@@ -369,5 +371,11 @@
    // Check broker negociated version
    pversion = broker.getProtocolVersion();
    assertEquals(pversion, 0);
  }
    broker.stop();
    logError(Message.raw(
        Category.SYNC, Severity.INFORMATION,
        "Ending Replication ProtocolWindowTest : protocolVersion"));
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -28,13 +28,13 @@
import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
import org.opends.server.config.ConfigException;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -42,13 +42,14 @@
import java.util.NoSuchElementException;
import java.util.concurrent.locks.Lock;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Message;
import org.opends.messages.Category;
import org.opends.messages.Severity;
import org.opends.server.backends.task.TaskState;
import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperationBasis;
import org.opends.server.core.DirectoryServer;
@@ -58,7 +59,10 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.PersistentServerState;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.ReplicationDomain;
import org.opends.server.replication.protocol.ErrorMessage;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.schema.IntegerSyntax;
import org.opends.server.types.Attribute;
@@ -136,13 +140,50 @@
  }
  /**
   * Open a replicationServer session to the local ReplicationServer.
   * Retrieves the domain associated to the baseDn, and the value of the generationId
   * of this domain. If the domain does not exist, returns the default hard-coded\
   * value of the generationId corresponding to 'no entry'.
   *
   * @param baseDn The baseDn for which we want the generationId
   * @return The value of the generationId.
   */
  static protected long getGenerationId(DN baseDn)
  {
    // This is the value of the generationId computed by the server when the
    // suffix is empty.
    long genId = 3276850;
    try
    {
      ReplicationDomain replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
      genId = replDomain.getGenerationId();
    }
    catch(Exception e) {}
    return genId;
  }
  /**
   * Open a replicationServer session to the local ReplicationServer.
   * The generation is read from the replicationDomain object. If it
   * does not exist, take the 'empty backend' generationID.
   */
  protected ReplicationBroker openReplicationSession(
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, boolean emptyOldChanges)
          throws Exception
          throws Exception, SocketException
  {
    return openReplicationSession(baseDn, serverId, window_size,
        port, timeout, emptyOldChanges, getGenerationId(baseDn));
  }
  /**
   * Open a replicationServer session to the local ReplicationServer
   * providing the generationId.
   */
  protected ReplicationBroker openReplicationSession(
        final DN baseDn, short serverId, int window_size,
        int port, int timeout, boolean emptyOldChanges,
        long generationId)
  throws Exception, SocketException
  {
    ServerState state;
    if (emptyOldChanges)
@@ -151,8 +192,8 @@
       state = new ServerState();
    ReplicationBroker broker = new ReplicationBroker(
        state, baseDn, serverId, 0, 0, 0, 0, window_size, 0,
        getReplSessionSecurity());
        state, baseDn, serverId, 0, 0, 0, 0,
        window_size, 0, generationId, getReplSessionSecurity());
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
    broker.start(servers);
@@ -170,7 +211,15 @@
      {
        while (true)
        {
          broker.receive();
          ReplicationMessage rMsg = broker.receive();
          if (rMsg instanceof ErrorMessage)
          {
            ErrorMessage eMsg = (ErrorMessage)rMsg;
            logError(new MessageBuilder(
                "ReplicationTestCase/openReplicationSession ").append(
                " received ErrorMessage when emptying old changes ").append(
                eMsg.getDetails()).toMessage());
          }
        }
      }
      catch (Exception e)
@@ -184,16 +233,30 @@
  }
  /**
   * Open a replicationServer session to the local ReplicationServer
   * with a default value generationId.
   *
   */
  protected ReplicationBroker openReplicationSession(
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, ServerState state)
    throws Exception, SocketException
  {
    return openReplicationSession(baseDn, serverId, window_size,
        port, timeout, state, getGenerationId(baseDn));
  }
  /**
   * Open a new session to the ReplicationServer
   * starting with a given ServerState.
   */
  protected ReplicationBroker openReplicationSession(
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, ServerState state)
          throws Exception
      int port, int timeout, ServerState state, long generationId)
          throws Exception, SocketException
  {
    ReplicationBroker broker = new ReplicationBroker(
        state, baseDn, serverId, 0, 0, 0, 0, window_size, 0,
        state, baseDn, serverId, 0, 0, 0, 0, window_size, 0, generationId,
        getReplSessionSecurity());
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
@@ -213,7 +276,18 @@
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, int maxSendQueue, int maxRcvQueue,
      boolean emptyOldChanges)
          throws Exception
      throws Exception, SocketException
  {
    return openReplicationSession(baseDn, serverId, window_size,
        port, timeout, maxSendQueue, maxRcvQueue, emptyOldChanges,
        getGenerationId(baseDn));
  }
  protected ReplicationBroker openReplicationSession(
      final DN baseDn, short serverId, int window_size,
        int port, int timeout, int maxSendQueue, int maxRcvQueue,
        boolean emptyOldChanges, long generationId)
            throws Exception, SocketException
  {
    ServerState state;
    if (emptyOldChanges)
@@ -223,7 +297,7 @@
    ReplicationBroker broker = new ReplicationBroker(
        state, baseDn, serverId, maxRcvQueue, 0,
        maxSendQueue, 0, window_size, 0,
        maxSendQueue, 0, window_size, 0, generationId,
        getReplSessionSecurity());
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
@@ -240,7 +314,15 @@
      {
        while (true)
        {
          broker.receive();
          ReplicationMessage rMsg = broker.receive();
          if (rMsg instanceof ErrorMessage)
          {
            ErrorMessage eMsg = (ErrorMessage)rMsg;
            logError(new MessageBuilder(
                "ReplicationTestCase/openReplicationSession ").append(
                " received ErrorMessage when emptying old changes ").append(
                eMsg.getDetails()).toMessage());
          }
        }
      }
      catch (Exception e)
@@ -264,13 +346,22 @@
      while (true)
      {
        DN dn = configEntryList.removeLast();
             logError(Message.raw(Category.SYNC, Severity.NOTICE,
        logError(Message.raw(Category.SYNC, Severity.NOTICE,
                 "cleaning config entry " + dn));
        op = new DeleteOperationBasis(connection, InternalClientConnection
            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
            dn);
        op.run();
        if ((op.getResultCode() != ResultCode.SUCCESS) &&
            (op.getResultCode() != ResultCode.NO_SUCH_OBJECT))
        {
          logError(Message.raw(Category.SYNC, Severity.NOTICE,
                   "ReplicationTestCase/Cleaning config entries" +
                   "DEL " + dn +
                   " failed " + op.getResultCode().getResultCodeName()));
        }
      }
    }
    catch (NoSuchElementException e) {
@@ -293,14 +384,23 @@
      while (true)
      {
        DN dn = entryList.removeLast();
        logError(Message.raw(Category.SYNC, Severity.NOTICE,
            "cleaning entry " + dn));
        op = new DeleteOperationBasis(connection, InternalClientConnection
            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
            dn);
        op = new DeleteOperationBasis(connection,
               InternalClientConnection.nextOperationID(),
               InternalClientConnection.nextMessageID(),
               null,
               dn);
        op.run();
        if ((op.getResultCode() != ResultCode.SUCCESS) &&
            (op.getResultCode() != ResultCode.NO_SUCH_OBJECT))
        {
          logError(Message.raw(Category.SYNC, Severity.NOTICE,
                   "ReplicationTestCase/Cleaning entries" +
                   "DEL " + dn +
                   " failed " + op.getResultCode().getResultCodeName()));
        }
      }
    }
    catch (NoSuchElementException e) {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
@@ -158,7 +158,7 @@
      ResultCode code = modOp.getResultCode();
      assertTrue(code.equals(ResultCode.SUCCESS),
                 "The original operation failed");
                 "The original operation failed: " + code.getResultCodeName());
      // See if the client has received the msg
      ReplicationMessage msg = broker.receive();
@@ -217,7 +217,7 @@
  public void replaySchemaChange() throws Exception
  {
    logError(Message.raw(Category.SYNC, Severity.NOTICE,
        "Starting replication test : pushSchemaChange "));
        "Starting replication test : replaySchemaChange "));
    final DN baseDn = DN.decode("cn=schema");
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/IsolationTest.java
@@ -110,7 +110,8 @@
      // chek that the operation was successful.
      // check that the update failed.
      assertEquals(ResultCode.SUCCESS, op.getResultCode());
      assertEquals(op.getResultCode(), ResultCode.SUCCESS,
          op.getAdditionalLogMessage().toString());
    }
    finally
    {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java
@@ -107,8 +107,8 @@
    ChangeNumber cn1 = gen1.newChangeNumber();
    ChangeNumber cn2 = gen2.newChangeNumber();
    state.update(cn1);
    state.update(cn2);
    assertEquals(state.update(cn1), true);
    assertEquals(state.update(cn2), true);
    state.save();
@@ -120,6 +120,12 @@
        "cn1 has not been saved or loaded correctly for " + dn);
    assertEquals(cn2Saved, cn2,
        "cn2 has not been saved or loaded correctly for " + dn);
    state.clear();
    stateSaved = new PersistentServerState(baseDn);
    cn1Saved = stateSaved.getMaxChangeNumber((short) 1);
    assertEquals(cn1Saved, null,
        "cn1 has not been saved after clear for " + dn);
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -493,7 +493,8 @@
  {
    state.update(new ChangeNumber((long)1, 1,(short)1));
    ServerStartMessage msg = new ServerStartMessage(serverId, baseDN,
        window, window, window, window, window, window, state, (short)1, true);
        window, window, window, window, window, window, state, (short)1,
        (long)1, true);
    ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes());
    assertEquals(msg.getServerId(), newMsg.getServerId());
    assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
@@ -507,6 +508,7 @@
    assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
        newMsg.getServerState().getMaxChangeNumber((short)1));
    assertEquals(msg.getVersion(), newMsg.getVersion());
    assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
  }
  @DataProvider(name="changelogStart")
@@ -527,7 +529,7 @@
  {
    state.update(new ChangeNumber((long)1, 1,(short)1));
    ReplServerStartMessage msg = new ReplServerStartMessage(serverId,
        url, baseDN, window, state, (short)1, true);
        url, baseDN, window, state, (short)1, (long)1, true);
    ReplServerStartMessage newMsg = new ReplServerStartMessage(msg.getBytes());
    assertEquals(msg.getServerId(), newMsg.getServerId());
    assertEquals(msg.getServerURL(), newMsg.getServerURL());
@@ -536,6 +538,7 @@
    assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
        newMsg.getServerState().getMaxChangeNumber((short)1));
    assertEquals(msg.getVersion(), newMsg.getVersion());
    assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
    assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
  }
@@ -572,9 +575,11 @@
    List<String> connectedServers = new ArrayList<String>(0);
    connectedServers.add("s1");
    connectedServers.add("s2");
    ReplServerInfoMessage msg = new ReplServerInfoMessage(connectedServers);
    ReplServerInfoMessage msg =
      new ReplServerInfoMessage(connectedServers, 13);
    ReplServerInfoMessage newMsg = new ReplServerInfoMessage(msg.getBytes());
    assertEquals(msg.getConnectedServers(), newMsg.getConnectedServers());
    assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
  }
  /**
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -73,7 +73,7 @@
    ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
    DbHandler handler =
      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv);
      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv, 1);
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);
    ChangeNumber changeNumber1 = gen.newChangeNumber();
@@ -118,4 +118,75 @@
    TestCaseUtils.deleteDirectory(testRoot);
  }
  /*
   * Test the feature of clearing a dbHandler used by a replication server.
   * The clear feature is used when a replication server receives a request
   * to reset the generationId of a given domain.
   */
  @Test()
  void testDbHandlerClear() throws Exception
  {
    TestCaseUtils.startServer();
    //  find  a free port for the replicationServer
    ServerSocket socket = TestCaseUtils.bindFreePort();
    int changelogPort = socket.getLocalPort();
    socket.close();
    // configure a ReplicationServer.
    ReplServerFakeConfiguration conf =
      new ReplServerFakeConfiguration(changelogPort, null, 0,
                                     2, 0, 100, null);
    ReplicationServer replicationServer = new ReplicationServer(conf);
    // create or clean a directory for the dbHandler
    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
    String path = buildRoot + File.separator + "build" + File.separator +
                  "unit-tests" + File.separator + "dbHandler";
    File testRoot = new File(path);
    if (testRoot.exists())
    {
      TestCaseUtils.deleteDirectory(testRoot);
    }
    testRoot.mkdirs();
    ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
    DbHandler handler =
      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv, 1);
    // Creates changes added to the dbHandler
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);
    ChangeNumber changeNumber1 = gen.newChangeNumber();
    ChangeNumber changeNumber2 = gen.newChangeNumber();
    ChangeNumber changeNumber3 = gen.newChangeNumber();
    DeleteMsg update1 = new DeleteMsg("o=test", changeNumber1, "uid");
    DeleteMsg update2 = new DeleteMsg("o=test", changeNumber2, "uid");
    DeleteMsg update3 = new DeleteMsg("o=test", changeNumber3, "uid");
    // Add the changes
    handler.add(update1);
    handler.add(update2);
    handler.add(update3);
    // Check they are here
    assertEquals(changeNumber1, handler.getFirstChange());
    assertEquals(changeNumber3, handler.getLastChange());
    // Clear ...
    handler.clear();
    // Check the db is cleared.
    assertEquals(null, handler.getFirstChange());
    assertEquals(null, handler.getLastChange());
    handler.shutdown();
    dbEnv.shutdown();
    replicationServer.shutdown();
    TestCaseUtils.deleteDirectory(testRoot);
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -30,6 +30,8 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.protocol.OperationContext.*;
import java.net.InetAddress;
@@ -41,8 +43,12 @@
import java.util.SortedSet;
import java.util.TreeSet;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.core.ModifyDNOperationBasis;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
@@ -68,6 +74,8 @@
public class ReplicationServerTest extends ReplicationTestCase
{
  // The tracer object for the debug logger
  private static final DebugTracer TRACER = getTracer();
  /**
   * The replicationServer that will be used in this test.
   */
@@ -105,6 +113,15 @@
    replicationServer = new ReplicationServer(conf);
  }
  private void debugInfo(String s)
  {
    // logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
    if (debugEnabled())
    {
      TRACER.debugInfo("** TEST **" + s);
    }
  }
  /**
   * Basic test of the replicationServer code :
   *  Connect 2 clients to the replicationServer and exchange messages
@@ -116,6 +133,7 @@
  @Test()
  public void changelogBasic() throws Exception
  {
    debugInfo("Starting changelogBasic");
    ReplicationBroker server1 = null;
    ReplicationBroker server2 = null;
@@ -188,7 +206,7 @@
        fail("ReplicationServer basic : incorrect message type received.");
      /*
       * Send and receive a Delete Msg from server 1 to server 2
       * Send and receive a Delete Msg from server 2 to server 1
       */
      msg =
        new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer2,
@@ -226,6 +244,7 @@
      if (server2 != null)
        server2.stop();
    }
    debugInfo("Ending changelogBasic");
  }
  /**
@@ -235,6 +254,7 @@
  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
  public void newClient() throws Exception
  {
    debugInfo("Starting newClient");
    ReplicationBroker broker = null;
    try {
@@ -244,7 +264,7 @@
      ReplicationMessage msg2 = broker.receive();
      if (!(msg2 instanceof DeleteMsg))
        fail("ReplicationServer basic transmission failed");
        fail("ReplicationServer basic transmission failed:" + msg2);
      else
      {
        DeleteMsg del = (DeleteMsg) msg2;
@@ -258,6 +278,7 @@
      if (broker != null)
        broker.stop();
    }
    debugInfo("Ending newClient");
  }
@@ -277,11 +298,13 @@
    try {
      broker =
        openReplicationSession(DN.decode("dc=example,dc=com"), (short) 3,
                             100, replicationServerPort, 1000, state);
                             100, replicationServerPort, 5000, state);
      ReplicationMessage msg2 = broker.receive();
      if (!(msg2 instanceof DeleteMsg))
        fail("ReplicationServer basic transmission failed");
      {
        fail("ReplicationServer basic transmission failed:" + msg2);
      }
      else
      {
        DeleteMsg del = (DeleteMsg) msg2;
@@ -304,6 +327,7 @@
  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
  public void newClientWithFirstChanges() throws Exception
  {
    debugInfo("Starting newClientWithFirstChanges");
    /*
     * Create a ServerState updated with the first changes from both servers
     * done in test changelogBasic.
@@ -313,6 +337,7 @@
    state.update(firstChangeNumberServer2);
    newClientWithChanges(state, secondChangeNumberServer1);
    debugInfo("Ending newClientWithFirstChanges");
  }
  /**
@@ -792,7 +817,7 @@
      ServerStartMessage msg =
        new ServerStartMessage((short) 1723, DN.decode("dc=example,dc=com"),
            0, 0, 0, 0, WINDOW, (long) 5000, new ServerState(),
            ProtocolVersion.currentVersion(), sslEncryption);
            ProtocolVersion.currentVersion(), 0, sslEncryption);
      session.publish(msg);
      // Read the Replication Server state from the ReplServerStartMessage that
@@ -819,10 +844,13 @@
      // send a ServerStartMessage containing the ServerState that was just
      // received.
      DN baseDn = DN.decode("dc=example,dc=com");
      msg = new ServerStartMessage(
          (short) 1724, DN.decode("dc=example,dc=com"),
          (short) 1724, baseDn,
          0, 0, 0, 0, WINDOW, (long) 5000, replServerState,
          ProtocolVersion.currentVersion(), sslEncryption);
          ProtocolVersion.currentVersion(),
          ReplicationTestCase.getGenerationId(baseDn),
          sslEncryption);
      session.publish(msg);
      // Read the ReplServerStartMessage that come back.