From 6951474612af7966457e7fd1cb43a4c9626f7e68 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Mon, 17 Sep 2007 08:00:37 +0000
Subject: [PATCH] Fixes repl init and total update #2253 #845 #1733

---
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java                             |    2 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java                             |   37 ++++
 opendj-sdk/opends/src/quicksetup/org/opends/quicksetup/installer/Installer.java                                      |  135 ++++++++++++++++
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ResetGenerationId.java                           |   62 ++++++-
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationCache.java                              |   95 +++++------
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java |    6 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java                                 |    3 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java                                  |    4 
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java                             |    2 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java             |  101 ++++++------
 10 files changed, 333 insertions(+), 114 deletions(-)

diff --git a/opendj-sdk/opends/src/quicksetup/org/opends/quicksetup/installer/Installer.java b/opendj-sdk/opends/src/quicksetup/org/opends/quicksetup/installer/Installer.java
index b0c1bef..175455d 100644
--- a/opendj-sdk/opends/src/quicksetup/org/opends/quicksetup/installer/Installer.java
+++ b/opendj-sdk/opends/src/quicksetup/org/opends/quicksetup/installer/Installer.java
@@ -3974,6 +3974,7 @@
                         ne), ne);
       }
     }
+    resetGenerationId(ctx, suffixDn, true, sourceServerDisplay);
   }
 
   /**
@@ -4023,4 +4024,138 @@
   {
     return (random.nextInt() & modulo);
   }
+
+  private void resetGenerationId(InitialLdapContext ctx,
+      String suffixDn, boolean displayProgress, String sourceServerDisplay)
+  throws ApplicationException
+  {
+    boolean taskCreated = false;
+    int i = 1;
+    boolean isOver = false;
+    String dn = null;
+    BasicAttributes attrs = new BasicAttributes();
+    Attribute oc = new BasicAttribute("objectclass");
+    oc.add("top");
+    oc.add("ds-task");
+    oc.add("ds-task-reset-generation-id");
+    attrs.put(oc);
+    attrs.put("ds-task-class-name",
+        "org.opends.server.tasks.SetGenerationIdTask");
+    attrs.put("ds-task-reset-generation-id-domain-base-dn", suffixDn);
+    while (!taskCreated)
+    {
+      String id = "quicksetup-reset-generation-id-"+i;
+      dn = "ds-task-id="+id+",cn=Scheduled Tasks,cn=Tasks";
+      attrs.put("ds-task-id", id);
+      try
+      {
+        DirContext dirCtx = ctx.createSubcontext(dn, attrs);
+        taskCreated = true;
+        LOG.log(Level.INFO, "created task entry: "+attrs);
+        dirCtx.close();
+      }
+      catch (NameAlreadyBoundException x)
+      {
+      }
+      catch (NamingException ne)
+      {
+        LOG.log(Level.SEVERE, "Error creating task "+attrs, ne);
+        throw new ApplicationException(
+            ReturnCode.APPLICATION_ERROR,
+                getThrowableMsg(INFO_ERROR_LAUNCHING_INITIALIZATION.get(
+                        sourceServerDisplay
+                ), ne), ne);
+      }
+      i++;
+    }
+    // Wait until it is over
+    SearchControls searchControls = new SearchControls();
+    searchControls.setCountLimit(1);
+    searchControls.setSearchScope(
+        SearchControls. OBJECT_SCOPE);
+    String filter = "objectclass=*";
+    searchControls.setReturningAttributes(
+        new String[] {
+            "ds-task-log-message",
+            "ds-task-state"
+        });
+    Message lastDisplayedMsg = null;
+    String lastLogMsg = null;
+    long lastTimeMsgDisplayed = -1;
+    while (!isOver)
+    {
+      try
+      {
+        Thread.sleep(500);
+      }
+      catch (Throwable t)
+      {
+      }
+      try
+      {
+        NamingEnumeration res = ctx.search(dn, filter, searchControls);
+        SearchResult sr = (SearchResult)res.next();
+        String logMsg = getFirstValue(sr, "ds-task-log-message");
+        if (logMsg != null)
+        {
+          if (!logMsg.equals(lastLogMsg))
+          {
+            LOG.log(Level.INFO, logMsg);
+            lastLogMsg = logMsg;
+          }
+        }
+        InstallerHelper helper = new InstallerHelper();
+        String state = getFirstValue(sr, "ds-task-state");
+
+        if (helper.isDone(state) || helper.isStoppedByError(state))
+        {
+          isOver = true;
+          Message errorMsg;
+          if (lastLogMsg == null)
+          {
+            errorMsg = INFO_ERROR_DURING_INITIALIZATION_NO_LOG.get(
+                    sourceServerDisplay, state, sourceServerDisplay);
+          }
+          else
+          {
+            errorMsg = INFO_ERROR_DURING_INITIALIZATION_LOG.get(
+                    sourceServerDisplay, lastLogMsg, state,
+                    sourceServerDisplay);
+          }
+
+          if (helper.isCompletedWithErrors(state))
+          {
+            notifyListeners(getFormattedWarning(errorMsg));
+          }
+          else if (!helper.isSuccessful(state) ||
+              helper.isStoppedByError(state))
+          {
+            ApplicationException ae = new ApplicationException(
+                ReturnCode.APPLICATION_ERROR, errorMsg,
+                null);
+            throw ae;
+          }
+          else if (displayProgress)
+          {
+            notifyListeners(getFormattedProgress(
+                INFO_SUFFIX_INITIALIZED_SUCCESSFULLY.get()));
+          }
+        }
+      }
+      catch (NameNotFoundException x)
+      {
+        isOver = true;
+        notifyListeners(getFormattedProgress(
+            INFO_SUFFIX_INITIALIZED_SUCCESSFULLY.get()));
+      }
+      catch (NamingException ne)
+      {
+        throw new ApplicationException(
+            ReturnCode.APPLICATION_ERROR,
+                getThrowableMsg(INFO_ERROR_POOLING_INITIALIZATION.get(
+                        sourceServerDisplay),
+                        ne), ne);
+      }
+    }
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index a071d12..1fa3f6c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -122,7 +122,7 @@
    */
   private boolean connectionError = false;
 
-  private Object connectPhaseLock = new Object();
+  private final Object connectPhaseLock = new Object();
 
   /**
    * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index c1847bc..e392b3c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -2415,7 +2415,7 @@
   public void resetGenerationId()
   {
     requestedResetSinceLastStart = true;
-    ResetGenerationId genIdMessage = new ResetGenerationId();
+    ResetGenerationId genIdMessage = new ResetGenerationId(this.generationId);
     broker.publish(genIdMessage);
   }
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ResetGenerationId.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ResetGenerationId.java
index 0945377..ace1403 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ResetGenerationId.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ResetGenerationId.java
@@ -26,7 +26,10 @@
  */
 package org.opends.server.replication.protocol;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
 import java.util.zip.DataFormatException;
 
 
@@ -38,13 +41,15 @@
     Serializable
 {
   private static final long serialVersionUID = 7657049716115572226L;
-
+  private long generationId;
 
   /**
    * Creates a new message.
+   * @param generationId The new reference value of the generationID.
    */
-  public ResetGenerationId()
+  public ResetGenerationId(long generationId)
   {
+    this.generationId = generationId;
   }
 
   /**
@@ -57,9 +62,24 @@
    */
   public ResetGenerationId(byte[] in) throws DataFormatException
   {
-    if (in[0] != MSG_TYPE_RESET_GENERATION_ID)
-      throw new
-      DataFormatException("input is not a valid GenerationId Message");
+    try
+    {
+      if (in[0] != MSG_TYPE_RESET_GENERATION_ID)
+        throw new
+        DataFormatException("input is not a valid GenerationId Message");
+
+      int pos = 1;
+
+      /* read the generationId */
+      int length = getNextLength(in, pos);
+      generationId = Long.valueOf(new String(in, pos, length,
+      "UTF-8"));
+      pos += length +1;
+    } catch (UnsupportedEncodingException e)
+    {
+      throw new DataFormatException("UTF-8 is not supported by this jvm.");
+    }
+
   }
 
   /**
@@ -68,11 +88,33 @@
   @Override
   public byte[] getBytes()
   {
-    int length = 1;
-    byte[] resultByteArray = new byte[length];
+    try
+    {
+      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
 
-    /* put the type of the operation */
-    resultByteArray[0] = MSG_TYPE_RESET_GENERATION_ID;
-    return resultByteArray;
+      /* Put the message type */
+      oStream.write(MSG_TYPE_RESET_GENERATION_ID);
+
+      // Put the generationId
+      oStream.write(String.valueOf(generationId).getBytes("UTF-8"));
+      oStream.write(0);
+
+      return oStream.toByteArray();
+    }
+    catch (IOException e)
+    {
+      // never happens
+      return null;
+    }
+  }
+
+  /**
+   * Returns the generation Id set in this message.
+   * @return the value of the generation ID.
+   *
+   */
+  public long getGenerationId()
+  {
+    return this.generationId;
   }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationCache.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
index 990eb1f..6fd8925 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -959,19 +959,6 @@
     }
 
     /**
-     * Sets the replication server informations for the provided
-     * handler from the provided ReplServerInfoMessage.
-     *
-     * @param handler The server handler from which the info was received.
-     * @param infoMsg The information message that was received.
-     */
-    public void setReplServerInfo(
-        ServerHandler handler, ReplServerInfoMessage infoMsg)
-    {
-      handler.setReplServerInfo(infoMsg);
-    }
-
-    /**
      * Sets the provided value as the new in memory generationId.
      *
      * @param generationId The new value of generationId.
@@ -1007,20 +994,27 @@
      *
      * @param senderHandler The handler associated to the server
      *        that requested to reset the generationId.
+     * @param genIdMsg The reset generation ID msg received.
      */
-    public void resetGenerationId(ServerHandler senderHandler)
+    public void resetGenerationId(ServerHandler senderHandler,
+        ResetGenerationId genIdMsg)
     {
+      long newGenId = genIdMsg.getGenerationId();
+
       if (debugEnabled())
         TRACER.debugInfo(
           "In " + this.replicationServer.getMonitorInstanceName() +
           " baseDN=" + baseDn +
-          " RCache.resetGenerationId");
+          " RCache.resetGenerationId received new ref genId="  + newGenId);
 
       // Notifies the others LDAP servers that from now on
       // they have the bad generationId
       for (ServerHandler handler : connectedServers.values())
       {
-        handler.resetGenerationId();
+        if (newGenId != handler.getGenerationId())
+        {
+          handler.resetGenerationId();
+        }
       }
 
       // Propagates the reset message to the others replication servers
@@ -1031,7 +1025,7 @@
         {
           try
           {
-            handler.sendGenerationId(new ResetGenerationId());
+            handler.sendGenerationId(genIdMsg);
           }
           catch (IOException e)
           {
@@ -1041,45 +1035,48 @@
         }
       }
 
-      // Reset the localchange and state db for the current domain
-      synchronized (sourceDbHandlers)
+      if (this.generationId != newGenId)
       {
-        for (DbHandler dbHandler : sourceDbHandlers.values())
+        // Reset the localchange and state db for the current domain
+        synchronized (sourceDbHandlers)
         {
-          try
+          for (DbHandler dbHandler : sourceDbHandlers.values())
           {
-            dbHandler.clear();
+            try
+            {
+              dbHandler.clear();
+            }
+            catch (Exception e)
+            {
+              // TODO: i18n
+              logError(Message.raw(
+                  "Exception caught while clearing dbHandler:" +
+                  e.getLocalizedMessage()));
+            }
           }
-          catch (Exception e)
-          {
-            // TODO: i18n
-            logError(Message.raw(
-                "Exception caught while clearing dbHandler:" +
-                e.getLocalizedMessage()));
-          }
+          sourceDbHandlers.clear();
+
+          if (debugEnabled())
+            TRACER.debugInfo(
+                "In " + this.replicationServer.getMonitorInstanceName() +
+                " baseDN=" + baseDn +
+            " The source db handler has been cleared");
         }
-        sourceDbHandlers.clear();
+        try
+        {
+          replicationServer.clearGenerationId(baseDn);
+        }
+        catch (Exception e)
+        {
+          // TODO: i18n
+          logError(Message.raw(
+              "Exception caught while clearing generationId:" +
+              e.getLocalizedMessage()));
+        }
 
-        if (debugEnabled())
-          TRACER.debugInfo(
-              "In " + this.replicationServer.getMonitorInstanceName() +
-              " baseDN=" + baseDn +
-              " The source db handler has been cleared");
+        // Reset the in memory domain generationId
+        generationId = newGenId;
       }
-      try
-      {
-        replicationServer.clearGenerationId(baseDn);
-      }
-      catch (Exception e)
-      {
-        // TODO: i18n
-        logError(Message.raw(
-            "Exception caught while clearing generationId:" +
-            e.getLocalizedMessage()));
-      }
-
-      // Reset the in memory domain generationId
-      generationId = -1;
     }
 
     /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 7a445bf..57f9a7e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -130,6 +130,14 @@
   // ID of the backend
   private static final String backendId = "replicationChanges";
 
+  // At startup, the listen thread wait on this flag for the connet
+  // thread to look for other servers in the topology.
+  // TODO when a replication server is out of date (has old changes
+  // to receive from other servers, the listen thread should not accept
+  // connection from ldap servers. (issue 1302)
+  private boolean connectedInTopology = false;
+  private final Object connectedInTopologyLock = new Object();
+
   /**
    * The tracer object for the debug logger.
    */
@@ -211,6 +219,23 @@
   void runListen()
   {
     Socket newSocket;
+
+    // wait for the connect thread to find other replication
+    // servers in the topology before starting to accept connections
+    // from the ldap servers.
+    synchronized (connectedInTopologyLock)
+    {
+      if (connectedInTopology == false)
+      {
+        try
+        {
+          connectedInTopologyLock.wait(1000);
+        } catch (InterruptedException e)
+        {
+        }
+      }
+    }
+
     while ((shutdown == false) && (stopListen  == false))
     {
       // Wait on the replicationServer port.
@@ -286,6 +311,15 @@
           }
         }
       }
+      synchronized (connectedInTopologyLock)
+      {
+        // wake up the listen thread if necessary.
+        if (connectedInTopology == false)
+        {
+          connectedInTopologyLock.notify();
+          connectedInTopology = true;
+        }
+      }
       try
       {
         synchronized (this)
@@ -391,7 +425,7 @@
       // FIXME : Is it better to have the time to receive the ReplServerInfo
       // from all the other replication servers since this info is necessary
       // to route an early received total update request.
-
+      try { Thread.sleep(300);} catch(Exception e) {}
       if (debugEnabled())
         TRACER.debugInfo("RS " +getMonitorInstanceName()+
             " creates listen threads");
@@ -798,6 +832,7 @@
         // Add the replication backend
         DirectoryServer.getConfigHandler().addEntry(backendConfigEntry, null);
       }
+      ldifImportConfig.close();
     }
     catch(Exception e)
     {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index cc97ccf..41fcfae 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -1617,7 +1617,7 @@
     *
     * @param infoMsg The information message.
     */
-   public void setReplServerInfo(ReplServerInfoMessage infoMsg)
+   public void receiveReplServerInfo(ReplServerInfoMessage infoMsg)
    {
      if (debugEnabled())
        TRACER.debugInfo("In " + replicationCache.getReplicationServer().
@@ -1749,6 +1749,7 @@
    public void sendGenerationId(ResetGenerationId msg)
    throws IOException
    {
+     generationId = msg.getGenerationId();
      session.publish(msg);
    }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index d582bb7..69d911d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -182,7 +182,7 @@
         else if (msg instanceof ResetGenerationId)
         {
           ResetGenerationId genIdMsg = (ResetGenerationId) msg;
-          replicationCache.resetGenerationId(this.handler);
+          replicationCache.resetGenerationId(this.handler, genIdMsg);
         }
         else if (msg instanceof WindowProbe)
         {
@@ -192,7 +192,7 @@
         else if (msg instanceof ReplServerInfoMessage)
         {
           ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
-          handler.setReplServerInfo(infoMsg);
+          handler.receiveReplServerInfo(infoMsg);
 
           if (debugEnabled())
           {
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
index 523ba60..4b100b5 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -719,7 +719,6 @@
       long genId;
 
       replServer1 = createReplicationServer(changelog1ID, false, testCase);
-      assertEquals(replServer1.getGenerationId(baseDn), -1);
 
       /*
        * Test  : empty replicated backend
@@ -892,6 +891,13 @@
        * Test: Reset the replication server in order to allow new data set.
        */
 
+      debugInfo("Launch an on-line import on DS.");
+      genId=-1;
+      Entry importTask = getTaskImport();
+      addTask(importTask, ResultCode.SUCCESS, null);
+      waitTaskState(importTask, TaskState.COMPLETED_SUCCESSFULLY, null);
+      Thread.sleep(500);
+
       Entry taskReset = TestCaseUtils.makeEntry(
           "dn: ds-task-id=resetgenid"+genId+ UUID.randomUUID() +
           ",cn=Scheduled Tasks,cn=Tasks",
@@ -908,33 +914,17 @@
 
       // TODO: Test that replication server db has been cleared
 
-      assertEquals(replServer1.getGenerationId(baseDn),
-          -1, "Expected genId to be reset in replServer1");
+      debugInfo("Expect new genId to be computed on DS and sent to all replServers after on-line import.");
+      genId = readGenId();
+      assertTrue(genId != -1, "DS is expected to have a new genID computed " +
+          " after on-line import but genId=" + genId);
 
-      ReplicationMessage rcvmsg = broker2.receive();
-      if (!(rcvmsg instanceof ErrorMessage))
-      {
-        fail("Broker2 is expected to receive an ErrorMessage " +
-            " to signal degradation due to reset" + rcvmsg);
-      }
-      ErrorMessage emsg = (ErrorMessage)rcvmsg;
-      debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails());
+      rgenId = replServer1.getGenerationId(baseDn);     
+      assertEquals(genId, rgenId, "DS and replServer are expected to have same genId.");
 
-      rcvmsg = broker3.receive();
-      if (!(rcvmsg instanceof ErrorMessage))
-      {
-        fail("Broker3 is expected to receive an ErrorMessage " +
-            " to signal degradation due to reset" + rcvmsg);
-      }
-      emsg = (ErrorMessage)rcvmsg;
-      debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails());
-
-      rgenId = replServer1.getGenerationId(baseDn);
-      assertTrue(rgenId==-1,"Expecting that genId has been reset in replServer1: rgenId="+rgenId);
-
-      assertTrue(replServer1.getReplicationCache(baseDn, false).
+      assertTrue(!replServer1.getReplicationCache(baseDn, false).
           isDegradedDueToGenerationId(server1ID),
-      "Expecting that DS is degraded since domain genId has been reset");
+      "Expecting that DS is not degraded since domain genId has been reset");
 
       assertTrue(replServer1.getReplicationCache(baseDn, false).
           isDegradedDueToGenerationId(server2ID),
@@ -946,8 +936,39 @@
 
       // Now create a change that normally would be replicated
       // but will not be replicated here since DS and brokers are degraded
-      String[] ent2 = { createEntry(UUID.randomUUID()) };
-      this.addTestEntriesToDB(ent2);
+      String[] ent3 = { createEntry(UUID.randomUUID()) };
+      this.addTestEntriesToDB(ent3);
+
+      try
+      {
+        ReplicationMessage msg = broker2.receive();
+        if (!(msg instanceof ErrorMessage))
+        {
+          fail("Broker 2 connection is expected to receive an ErrorMessage."
+              + msg);
+        }
+        ErrorMessage emsg = (ErrorMessage)msg;
+        debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails());
+      }
+      catch(SocketTimeoutException se)
+      {
+        fail("Broker 2 is expected to receive an ErrorMessage.");
+      }
+      try
+      {
+        ReplicationMessage msg = broker3.receive();
+        if (!(msg instanceof ErrorMessage))
+        {
+          fail("Broker 3 connection is expected to receive an ErrorMessage."
+              + msg);
+        }
+        ErrorMessage emsg = (ErrorMessage)msg;
+        debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails());
+      }
+      catch(SocketTimeoutException se)
+      {
+        fail("Broker 3 is expected to receive an ErrorMessage.");
+      }
 
       try
       {
@@ -969,23 +990,7 @@
         ReplicationMessage msg = broker3.receive();
         fail("No update message is supposed to be received by degraded broker3"+ msg);
       } catch(SocketTimeoutException e) { /* expected */ }
-
-
-      debugInfo("Launch an on-line import on DS.");
-      genId=-1;
-      Entry importTask = getTaskImport();
-      addTask(importTask, ResultCode.SUCCESS, null);
-      waitTaskState(importTask, TaskState.COMPLETED_SUCCESSFULLY, null);
-      Thread.sleep(500);
-
-      debugInfo("Expect new genId to be computed on DS and sent to all replServers after on-line import.");
-      genId = readGenId();
-      assertTrue(genId != -1, "DS is expected to have a new genID computed " +
-          " after on-line import but genId=" + genId);
-
-      rgenId = replServer1.getGenerationId(baseDn);     
-      assertEquals(genId, rgenId, "DS and replServer are expected to have same genId.");
-
+    
       // In S1 launch the total update to initialize S2
       addTask(taskInitRemoteS2, ResultCode.SUCCESS, null);
 
@@ -1008,7 +1013,7 @@
       Thread.sleep(200);
 
       debugInfo("Verifying that replServer1 has been reset.");
-      assertEquals(replServer1.getGenerationId(baseDn), -1);
+      assertEquals(replServer1.getGenerationId(baseDn), rgenId);
 
       debugInfo("Disconnect DS from replServer1 (required in order to DEL entries).");
       disconnectFromReplServer(changelog1ID);
@@ -1163,9 +1168,9 @@
     
     debugInfo("Verifying that all replservers genIds have been reset.");
     genId = readGenId();
-    assertEquals(replServer1.getGenerationId(baseDn), -1);
-    assertEquals(replServer2.getGenerationId(baseDn), -1);
-    assertEquals(replServer3.getGenerationId(baseDn), -1);
+    assertEquals(replServer2.getGenerationId(baseDn), genId);
+    assertEquals(replServer2.getGenerationId(baseDn), genId);
+    assertEquals(replServer3.getGenerationId(baseDn), genId);
 
     debugInfo("Disconnect DS from replServer1 (required in order to DEL entries).");
     disconnectFromReplServer(changelog1ID);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 561ad28..dcb3911 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -48,6 +48,9 @@
 import org.opends.server.backends.task.TaskState;
 import org.opends.server.core.ModifyDNOperationBasis;
 import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.protocols.asn1.ASN1OctetString;
+import org.opends.server.protocols.internal.InternalSearchOperation;
+import org.opends.server.protocols.ldap.LDAPFilter;
 import org.opends.server.replication.ReplicationTestCase;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ChangeNumberGenerator;
@@ -75,6 +78,7 @@
 import org.opends.server.types.ModificationType;
 import org.opends.server.types.RDN;
 import org.opends.server.types.ResultCode;
+import org.opends.server.types.SearchScope;
 import org.opends.server.util.TimeThread;
 import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation;
 import org.testng.annotations.AfterClass;
@@ -1198,4 +1202,4 @@
      catch(Exception e) {};
      return l;
    }
-}
+ }

--
Gitblit v1.10.0