mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noël Rouvignac
07.57.2015 b7465fdf4336e1d2f4c7dd231940b4034b1abb0c
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/StateMachineTest.java
@@ -31,6 +31,7 @@
import java.util.ArrayList;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.forgerock.i18n.LocalizableMessage;
@@ -46,18 +47,28 @@
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DoneMsg;
import org.opends.server.replication.protocol.EntryMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.types.Attribute;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.util.TestTimer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static java.util.concurrent.TimeUnit.*;
import static org.mockito.Mockito.*;
import static org.opends.server.util.CollectionUtils.*;
import static org.testng.Assert.*;
@@ -69,7 +80,6 @@
@SuppressWarnings("javadoc")
public class StateMachineTest extends ReplicationTestCase
{
  /** Server id definitions. */
  private static final String EXAMPLE_DN = "dc=example,dc=com";
  private static DN EXAMPLE_DN_;
@@ -96,6 +106,22 @@
    }
  }
  private static void shutdown(BrokerReader reader)
  {
    if (reader != null)
    {
      reader.shutdown();
    }
  }
  private static void shutdown(BrokerWriter writer)
  {
    if (writer != null)
    {
      writer.shutdown();
    }
  }
  private void initTest() throws IOException
  {
    rs1Port = -1;
@@ -122,67 +148,41 @@
    rs1Port = -1;
  }
  /**
   * Check connection of the provided ds to the
   * replication server. Waits for connection to be ok up to secTimeout seconds
   * before failing.
   */
  private void checkConnection(int secTimeout, int dsId) throws Exception
  /** Waits until the provided ds is connected to the replication server. */
  private void waitUntiConnected(final int dsId) throws Exception
  {
    ReplicationBroker rb = null;
    LDAPReplicationDomain rd = null;
    TestTimer timer = new TestTimer.Builder()
      .maxSleep(30, SECONDS)
      .sleepTimes(100, MILLISECONDS)
      .toTimer();
    timer.repeatUntilSuccess(new Callable<Void>()
    {
      @Override
      public Void call() throws Exception
      {
        assertTrue(isConnected(dsId), "checkConnection: DS " + dsId + " is not connected to the RS");
        return null;
      }
    });
  }
  private boolean isConnected(final int dsId)
  {
    switch (dsId)
    {
      case DS1_ID:
        rd = ds1;
        break;
      case DS2_ID:
        rb = ds2;
        break;
      case DS3_ID:
        rb = ds3;
        break;
      default:
        fail("Unknown ds server id.");
    }
    int nSec = 0;
    // Go out of the loop only if connection is verified or if timeout occurs
    while (true)
    {
      // Test connection
      boolean connected = false;
      if (rd != null)
      {
        connected = rd.isConnected();
      }
      else
      {
        connected = rb.isConnected();
      }
      if (connected)
      {
        // Connection verified
        debugInfo("checkConnection: connection of DS " + dsId +
          " to RS obtained after " + nSec + " seconds.");
        return;
      }
      Thread.sleep(100);
      nSec++;
      // Timeout reached, end with error
      assertFalse(nSec > secTimeout * 10,
          "checkConnection: DS " + dsId + " is not connected to the RS after "
          + secTimeout + " seconds.");
    case DS1_ID:
      return ds1.isConnected();
    case DS2_ID:
      return ds2.isConnected();
    case DS3_ID:
      return ds3.isConnected();
    default:
      fail("Unknown ds server id.");
      return false;
    }
  }
  /**
   * Creates a new ReplicationServer.
   */
  /** Creates a new ReplicationServer. */
  private ReplicationServer createReplicationServer(String testCase,
      int degradedStatusThreshold) throws Exception
  {
@@ -195,10 +195,7 @@
    return new ReplicationServer(conf);
  }
  /**
   * Creates and starts a new ReplicationDomain configured for the replication
   * server.
   */
  /** Creates and starts a new ReplicationDomain configured for the replication server. */
  @SuppressWarnings("unchecked")
  private LDAPReplicationDomain createReplicationDomain(int dsId) throws Exception
  {
@@ -263,25 +260,17 @@
    try
    {
      /**
       * DS1 start, no RS available: DS1 should be in not connected status
       */
      // DS1 start, no RS available: DS1 should be in not connected status
      ds1 = createReplicationDomain(DS1_ID);
      sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS);
      /**
       * RS1 starts , DS1 should connect to it and be in normal status
       */
      // RS1 starts , DS1 should connect to it and be in normal status
      rs1 = createReplicationServer(testCase, 5000);
      sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
      /**
       * RS1 stops, DS1 should go in not connected status
       */
      // RS1 stops, DS1 should go in not connected status
      rs1.remove();
      sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS);
    } finally
    {
      endTest();
@@ -324,26 +313,21 @@
    try
    {
      /**
       * RS1 starts with specified threshold value
       */
      /* RS1 starts with specified threshold value */
      rs1 = createReplicationServer(testCase, thresholdValue);
      /**
       * DS2 starts and connects to RS1. No reader and low window value at the
       * beginning so writer for DS2 in RS should enqueue changes after first
       * changes sent to DS. (window value reached: a window msg needed by RS for
       * following sending changes to DS)
      /*
       * DS2 starts and connects to RS1. No reader and low window value at the beginning
       * so writer for DS2 in RS should enqueue changes after first changes sent to DS.
       * (window value reached: a window msg needed by RS for following sending changes to DS)
       */
      ds2 = createReplicationBroker(DS2_ID, new ServerState(), EMPTY_DN_GENID);
      checkConnection(30, DS2_ID);
      waitUntiConnected(DS2_ID);
      /**
       * DS3 starts and connects to RS1
       */
      /* DS3 starts and connects to RS1 */
      ds3 = createReplicationBroker(DS3_ID, new ServerState(), EMPTY_DN_GENID);
      br3 = new BrokerReader(ds3, DS3_ID);
      checkConnection(30, DS3_ID);
      waitUntiConnected(DS3_ID);
      // Send first changes to reach window and block DS2 writer queue. Writer will take them
      // from queue and block (no more changes removed from writer queue) after
@@ -351,10 +335,9 @@
      bw = new BrokerWriter(ds3, DS3_ID, false);
      bw.followAndPause(11);
      /**
       * DS3 sends changes (less than threshold): DS2 should still be in normal
       * status so no topo message should be sent (update topo message
       * for telling status of DS2 changed)
      /*
       * DS3 sends changes (less than threshold): DS2 should still be in normal status
       * so no topo message should be sent (update topo message for telling status of DS2 changed)
       */
      int nChangesSent = 0;
      if (thresholdValue > 1)
@@ -367,49 +350,39 @@
        assertNull(msg, (msg != null) ? msg.toString() : "null");
      }
      /**
       * DS3 sends changes to reach the threshold value, DS3 should receive an
       * update topo message with status of DS2: degraded status
      /*
       * DS3 sends changes to reach the threshold value,
       * DS3 should receive an update topo message with status of DS2: degraded status
       */
      bw.followAndPause(thresholdValue - nChangesSent);
      // wait for a status MSG status analyzer to broker 3
      waitForDegradedStatusOnBroker3();
      /**
       * DS3 sends 10 additional changes after threshold value, DS2 should still be
       * degraded so no topo message received.
      /*
       * DS3 sends 10 additional changes after threshold value,
       * DS2 should still be degraded so no topo message received.
       */
      bw.followAndPause(10);
      bw.shutdown();
      shutdown(bw);
      Thread.sleep(1000); // Be sure status analyzer has time to test
      ReplicationMsg lastMsg = br3.getLastMsg();
      ReplicationMsg msg = br3.getLastMsg();
      debugInfo(testCase + " Step 3: last message from writer: " + msg);
      assertNull(lastMsg);
      /**
      /*
       * DS2 replays every changes and should go back to normal status
       * (create a reader to emulate replay of messages (messages read from queue))
       */
      br2 = new BrokerReader(ds2, DS2_ID);
      // wait for a status MSG status analyzer to broker 3
      waitForDegradedStatusOnBroker3();
    } finally
    {
      endTest();
      if (bw != null)
      {
        bw.shutdown();
      }
      if (br3 != null)
      {
        br3.shutdown();
      }
      if (br2 != null)
      {
        br2.shutdown();
      }
      shutdown(bw);
      shutdown(br3);
      shutdown(br2);
    }
  }
@@ -462,85 +435,72 @@
      // DS2 starts and connects to RS1
      ds2 = createReplicationBroker(DS2_ID, new ServerState(), EMPTY_DN_GENID);
      br = new BrokerReader(ds2, DS2_ID);
      checkConnection(30, DS2_ID);
      waitUntiConnected(DS2_ID);
      // DS2 starts sending a lot of changes
      bw = new BrokerWriter(ds2, DS2_ID, false);
      bw.follow();
      Thread.sleep(1000); // Let some messages being queued in RS
      /**
       * DS1 starts and connects to RS1, server state exchange should lead to
       * start in degraded status as some changes should be in queued in the RS
       * and the threshold value is 1 change in queue.
      /*
       * DS1 starts and connects to RS1, server state exchange should lead to start in degraded status
       * as some changes should be in queued in the RS and the threshold value is 1 change in queue.
       */
      ds1 = createReplicationDomain(DS1_ID);
      checkConnection(30, DS1_ID);
      sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
      waitUntiConnected(DS1_ID);
      waitUntilStatusEquals(ds1, ServerStatus.DEGRADED_STATUS);
      /**
       * DS2 stops sending changes: DS1 should replay pending changes and should
       * enter the normal status
       */
      /* DS2 stops sending changes: DS1 should replay pending changes and should enter the normal status */
      bw.pause();
      // Sleep enough so that replay can be done and analyzer has time
      // to see that the queue length is now under the threshold value.
      sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
      /**
       * RS1 stops to make DS1 go to not connected status (from normal status)
       */
      /* RS1 stops to make DS1 go to not connected status (from normal status) */
      rs1.remove();
      sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS);
      /**
       * DS2 restarts with up to date server state (this allows to have
       * restarting RS1 not sending him some updates he already sent)
      /*
       * DS2 restarts with up to date server state
       * (this allows to have restarting RS1 not sending him some updates he already sent)
       */
      ds2.stop();
      bw.shutdown();
      br.shutdown();
      shutdown(bw);
      shutdown(br);
      ServerState curState = ds1.getServerState();
      ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID);
      br = new BrokerReader(ds2, DS2_ID);
      /**
       * RS1 restarts, DS1 should get back to normal status
       */
      /* RS1 restarts, DS1 should get back to normal status */
      rs1 = createReplicationServer(testCase, DEGRADED_STATUS_THRESHOLD);
      checkConnection(30, DS2_ID);
      sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
      waitUntiConnected(DS2_ID);
      waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
      /**
       * DS2 sends again a lot of changes to make DS1 degraded again
       */
      /* DS2 sends again a lot of changes to make DS1 degraded again */
      bw = new BrokerWriter(ds2, DS2_ID, false);
      bw.follow();
      Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change
      sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.DEGRADED_STATUS);
      /**
       * RS1 stops to make DS1 go to not connected status (from degraded status)
       */
      /* RS1 stops to make DS1 go to not connected status (from degraded status) */
      rs1.remove();
      bw.pause();
      sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS);
      /**
       * DS2 restarts with up to date server state (this allows to have
       * restarting RS1 not sending him some updates he already sent)
      /*
       * DS2 restarts with up to date server state
       * (this allows to have restarting RS1 not sending him some updates he already sent)
       */
      ds2.stop();
      bw.shutdown();
      br.shutdown();
      shutdown(bw);
      shutdown(br);
      curState = ds1.getServerState();
      ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID);
      br = new BrokerReader(ds2, DS2_ID);
      /**
       * RS1 restarts, DS1 should reconnect in degraded status (from not connected
       * this time, not from state machine entry)
      /*
       * RS1 restarts, DS1 should reconnect in degraded status
       * (from not connected this time, not from state machine entry)
       */
      rs1 = createReplicationServer(testCase, DEGRADED_STATUS_THRESHOLD);
      // It is too difficult to tune the right sleep so disabling this test:
@@ -548,149 +508,130 @@
      // of DS1 to NORMAL_STATUS
      //sleep(2000);
      //sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
      checkConnection(30, DS2_ID);
      waitUntiConnected(DS2_ID);
      /**
       * DS1 should come back in normal status after a while
       */
      sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
      /* DS1 should come back in normal status after a while */
      waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
      /**
       * DS2 sends a reset gen id order with wrong gen id: DS1 should go into bad generation id status
      /*
       * DS2 sends a reset gen id order with wrong gen id:
       * DS1 should go into bad generation id status
       */
      long BAD_GEN_ID = 999999L;
      resetGenId(ds2, BAD_GEN_ID); // ds2 will also go bad gen
      sleepAssertStatusEquals(30, ds1, ServerStatus.BAD_GEN_ID_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.BAD_GEN_ID_STATUS);
      /**
       * DS2 sends again a reset gen id order with right id: DS1 should be disconnected
       * by RS then reconnect and enter again in normal status. This goes through
       * not connected status but not possible to check as should reconnect immediately
      /*
       * DS2 sends again a reset gen id order with right id: DS1 should be disconnected by RS
       * then reconnect and enter again in normal status. This goes through not connected status
       * but not possible to check as should reconnect immediately
       */
      resetGenId(ds2, EMPTY_DN_GENID); // ds2 will also be disconnected
      ds2.stop();
      br.shutdown(); // Reader could reconnect broker, but gen id would be bad: need to recreate a broker to send changex
      sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
      shutdown(br); // Reader could reconnect broker, but gen id would be bad: need to recreate a
                    // broker to send changex
      waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
      /**
       * DS2 sends again a lot of changes to make DS1 degraded again
       */
      /* DS2 sends again a lot of changes to make DS1 degraded again */
      curState = ds1.getServerState();
      ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID);
      checkConnection(30, DS2_ID);
      waitUntiConnected(DS2_ID);
      bw = new BrokerWriter(ds2, DS2_ID, false);
      br = new BrokerReader(ds2, DS2_ID);
      bw.follow();
      Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change
      sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.DEGRADED_STATUS);
      /**
       * DS2 sends reset gen id order with bad gen id: DS1 should go in bad gen id
       * status (from degraded status this time)
      /*
       * DS2 sends reset gen id order with bad gen id: DS1 should go in bad gen id status
       * (from degraded status this time)
       */
      resetGenId(ds2, -1); // -1 to allow next step full update and flush RS db so that DS1 can reconnect after full update
      sleepAssertStatusEquals(30, ds1, ServerStatus.BAD_GEN_ID_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.BAD_GEN_ID_STATUS);
      bw.pause();
      /**
       * DS2 engages full update (while DS1 in bad gen id status), DS1 should go
       * in full update status
      /*
       * DS2 engages full update (while DS1 in bad gen id status),
       * DS1 should go in full update status
       */
      BrokerInitializer bi = new BrokerInitializer(ds2, DS2_ID, false);
      bi.initFullUpdate(DS1_ID, 200);
      sleepAssertStatusEquals(30, ds1, ServerStatus.FULL_UPDATE_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.FULL_UPDATE_STATUS);
      /**
      /*
       * DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status)
       * and come back to normal status (RS genid was -1 so RS will adopt ne genb id)
       * and come back to normal status (RS genid was -1 so RS will adopt new gen id)
       */
      bi.runFullUpdate();
      sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
      /**
       * DS2 sends changes to DS1: DS1 should go in degraded status
       */
      /* DS2 sends changes to DS1: DS1 should go in degraded status */
      ds2.stop(); // will need a new broker with another gen id restart it
      bw.shutdown();
      br.shutdown();
      shutdown(bw);
      shutdown(br);
      long newGen = ds1.getGenerationID();
      curState = ds1.getServerState();
      ds2 = createReplicationBroker(DS2_ID, curState, newGen);
      checkConnection(30, DS2_ID);
      waitUntiConnected(DS2_ID);
      bw = new BrokerWriter(ds2, DS2_ID, false);
      br = new BrokerReader(ds2, DS2_ID);
      bw.follow();
      Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change
      sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.DEGRADED_STATUS);
      /**
       * DS2 engages full update (while DS1 in degraded status), DS1 should go
       * in full update status
       */
      /* DS2 engages full update (while DS1 in degraded status), DS1 should go in full update status */
      bi = new BrokerInitializer(ds2, DS2_ID, false);
      bi.initFullUpdate(DS1_ID, 300);
      sleepAssertStatusEquals(30, ds1, ServerStatus.FULL_UPDATE_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.FULL_UPDATE_STATUS);
      bw.pause();
      /**
      /*
       * DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status)
       * and come back to bad gen id status (RS genid was another gen id (300 entries instead of 200))
       * and come back to bad gen id status (RS genid was another gen id (300 entries instead of 200)
       */
      bi.runFullUpdate();
      sleepAssertStatusEquals(30, ds1, ServerStatus.BAD_GEN_ID_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.BAD_GEN_ID_STATUS);
      /**
       * DS2 sends reset gen id with gen id same as DS1: DS1 will be disconnected
       * by RS (not connected status) and come back to normal status
      /*
       * DS2 sends reset gen id with gen id same as DS1:
       * DS1 will be disconnected by RS (not connected status) and come back to normal status
       */
      ds2.stop(); // will need a new broker with another gen id restart it
      bw.shutdown();
      br.shutdown();
      shutdown(bw);
      shutdown(br);
      newGen = ds1.getGenerationID();
      curState = ds1.getServerState();
      ds2 = createReplicationBroker(DS2_ID, curState, newGen);
      checkConnection(30, DS2_ID);
      waitUntiConnected(DS2_ID);
      br = new BrokerReader(ds2, DS2_ID);
      resetGenId(ds2, newGen); // Make DS1 reconnect in normal status
      sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
      /**
       * DS2 engages full update (while DS1 in normal status), DS1 should go
       * in full update status
       */
      /* DS2 engages full update (while DS1 in normal status), DS1 should go in full update status */
      bi = new BrokerInitializer(ds2, DS2_ID, false);
      bi.initFullUpdate(DS1_ID, 300); // 300 entries will compute same genid of the RS
      sleepAssertStatusEquals(30, ds1, ServerStatus.FULL_UPDATE_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.FULL_UPDATE_STATUS);
      /**
      /*
       * DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status)
       * and come back to normal status (process full update with same data as
       * before so RS already has right gen id: version with 300 entries)
       * and come back to normal status (process full update with same data as before so RS already
       * has right gen id: version with 300 entries)
       */
      bi.runFullUpdate();
      ds2.stop();
      br.shutdown();
      sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
      shutdown(br);
      waitUntilStatusEquals(ds1, ServerStatus.NORMAL_STATUS);
      /**
       * RS1 stops, DS1 should go to not connected status
       */
      /* RS1 stops, DS1 should go to not connected status */
      rs1.remove();
      sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS);
      waitUntilStatusEquals(ds1, ServerStatus.NOT_CONNECTED_STATUS);
    } finally
    {
      // Finalize test
      endTest();
      if (bw != null)
      {
        bw.shutdown();
      }
      if (br != null)
      {
        br.shutdown();
      }
      shutdown(bw);
      shutdown(br);
    }
  }
@@ -750,7 +691,6 @@
   */
  private class BrokerInitializer
  {
    private ReplicationBroker rb;
    private int serverId = -1;
    private long userId;
@@ -769,9 +709,7 @@
     */
    private BrokerReader reader;
    /**
     * Creates a broker initializer. Also creates a reader according to request
     */
    /** Creates a broker initializer. Also creates a reader according to request */
    public BrokerInitializer(ReplicationBroker rb, int serverId,
      boolean createReader)
    {
@@ -780,9 +718,7 @@
      this.createReader = createReader;
    }
    /**
     * Initializes a full update session by sending InitializeTargetMsg.
     */
    /** Initializes a full update session by sending InitializeTargetMsg. */
    public void initFullUpdate(int destId, long nEntries)
    {
      // Also create reader ?
@@ -860,19 +796,16 @@
      if (createReader)
      {
        reader.shutdown();
        shutdown(reader);
      }
      debugInfo("Broker " + serverId + " initializer thread is dying");
    }
  }
  /**
   * Thread for sending a lot of changes through a broker.
   */
  /** Thread for sending a lot of changes through a broker. */
  private class BrokerWriter extends Thread
  {
    private ReplicationBroker rb;
    private int serverId = -1;
    private long userId;
@@ -993,9 +926,7 @@
      debugInfo("Broker " + serverId + " writer thread is dying");
    }
    /**
     * Stops the writer thread.
     */
    /** Stops the writer thread. */
    public void shutdown()
    {
      suspended.set(true); // If were working
@@ -1013,15 +944,10 @@
      }
      // Stop reader if any
      if (reader != null)
      {
        reader.shutdown();
      }
      StateMachineTest.shutdown(reader);
    }
    /**
     * Suspends the writer thread.
     */
    /** Suspends the writer thread. */
    public void pause()
    {
      if (isPaused())
@@ -1036,17 +962,13 @@
      }
    }
    /**
     * Test if the writer is suspended.
     */
    /** Test if the writer is suspended. */
    public boolean isPaused()
    {
      return sessionDone.get();
    }
    /**
     * Resumes the writer thread until it is paused.
     */
    /** Resumes the writer thread until it is paused. */
    public void follow()
    {
      sessionDone.set(false);
@@ -1131,7 +1053,6 @@
   */
  private class BrokerReader extends Thread
  {
    private ReplicationBroker rb;
    private int serverId = -1;
    private boolean shutdown;
@@ -1171,9 +1092,7 @@
      debugInfo("Broker " + serverId + " reader thread is dying");
    }
    /**
     * Returns last received message from reader When read, last value is cleared.
     */
    /** Returns last received message from reader When read, last value is cleared. */
    public ReplicationMsg getLastMsg()
    {
      ReplicationMsg toReturn = lastMsg;
@@ -1197,51 +1116,27 @@
  }
  /**
   * Waits for a long time for an equality condition to be true.
   * Every second, the equality check is performed. After the provided amount of
   * seconds, if the equality is false, an assertion error is raised.
   * This methods ends either because the equality is true or if the timeout
   * occurs after the provided number of seconds.
   * This method is convenient when the the equality can only occur after a
   * period of time which is difficult to establish, but we know it will occur
   * anyway. This has 2 advantages compared to a classical code like this:
   * - sleep(some time);
   * - assertEquals(testedValue, expectedValue);
   * 1. If the sleep value is too big, this will impact the total time of
   * running tests uselessly. It may also penalize a fast running machine where
   * the sleep time value may be unnecessarily to long.
   * 2. If the sleep value is too small, some slow machines may have the test
   * fail whereas some additional time would have made the test succeed.
   * @param secTimeout Number of seconds to wait before failing. The value for
   * this should be high. A timeout is needed anyway to have the test campaign
   * finish anyway.
   * @param testedValue The value we want to test
   * @param expectedValue The value the tested value should be equal to
   * Waits until the domain status reaches the expected status.
   * @param domain The domain whose status we want to test
   * @param expectedStatus The expected domain status
   */
  private void sleepAssertStatusEquals(int secTimeout, LDAPReplicationDomain testedValue,
    ServerStatus expectedValue) throws Exception
  private void waitUntilStatusEquals(final LDAPReplicationDomain domain, final ServerStatus expectedStatus) throws Exception
  {
    assertTrue(testedValue != null && expectedValue != null, "sleepAssertStatusEquals: null parameters");
    assertNotNull(domain);
    assertNotNull(expectedStatus);
    // Go out of the loop only if equality is obtained or if timeout occurs
    int nSec = 0;
    while (true)
    TestTimer timer = new TestTimer.Builder()
      .maxSleep(30, SECONDS)
      .sleepTimes(500, MILLISECONDS)
      .toTimer();
    timer.repeatUntilSuccess(new Callable<Void>()
    {
      Thread.sleep(1000);
      nSec++;
      // Test equality of values
      if (testedValue.getStatus().equals(expectedValue))
      @Override
      public Void call() throws Exception
      {
        debugInfo("sleepAssertStatusEquals: equality obtained after "
          + nSec + " seconds (" + expectedValue + ").");
        return;
        assertEquals(domain.getStatus(), expectedStatus);
        return null;
      }
      // Timeout reached, end with error
      assertTrue(nSec < secTimeout, "sleepAssertStatusEquals: got <"
          + testedValue.getStatus() + "> where expected <" + expectedValue
          + ">");
    }
    });
  }
}