mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
18.20.2007 aefad4e6cfa33be9eca8cda45861d98236d0faed
put back the 980 revision that had been removed because of a regression.
- fix the regression (schema problem)
- disabled the ChangelogTest.stopChangelog() test that appear to cause
some hangs.

----------

This change includes a set of new unit tests dedicated to the startup phase
of the synchronization server (former changelog server) and the connection
phase of client LDAP server to this synchronization server.
These tests cover various cases of this connection checking that the adequate
changes are then sent to the client.

This change also include the fix for :
issue 1155 : synchronization server miss one change when a new LDAP server is added
to the topology
10 files modified
575 ■■■■ changed files
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java 5 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java 11 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java 32 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java 1 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java 88 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java 29 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java 6 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java 399 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -340,6 +340,7 @@
      {
        newSocket =  listenSocket.accept();
        newSocket.setReceiveBufferSize(1000000);
        newSocket.setTcpNoDelay(true);
        ServerHandler handler = new ServerHandler(
                                     new SocketSession(newSocket), queueSize);
        handler.start(null, serverId, serverURL, rcvWindow, this);
@@ -414,11 +415,12 @@
                     InetAddress.getByName(hostname), Integer.parseInt(port));
      Socket socket = new Socket();
      socket.setReceiveBufferSize(1000000);
      socket.setTcpNoDelay(true);
      socket.connect(ServerAddr, 500);
      ServerHandler handler = new ServerHandler(
                                      new SocketSession(socket), queueSize);
      handler.start(baseDn, serverId, serverURL, rcvWindow, this);
     handler.start(baseDn, serverId, this.serverURL, rcvWindow, this);
    }
    catch (IOException e)
    {
@@ -545,6 +547,7 @@
    }
    dbEnv.shutdown();
    DirectoryServer.deregisterConfigurableComponent(this);
  }
opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java
@@ -170,10 +170,6 @@
  public ChangelogCursor openReadCursor(ChangeNumber changeNumber)
                throws DatabaseException, Exception
  {
    if (changeNumber == null)
      changeNumber = readFirstChange();
    if (changeNumber == null)
      return null;
    return new ChangelogCursor(changeNumber);
  }
@@ -319,8 +315,10 @@
    {
      cursor = db.openCursor(txn, null);
      DatabaseEntry key = new ChangelogKey(startingChangeNumber);
      DatabaseEntry data = new DatabaseEntry();
      if (startingChangeNumber != null)
      {
        key = new ChangelogKey(startingChangeNumber);
        data = new DatabaseEntry();
      if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
        OperationStatus.SUCCESS)
@@ -328,6 +326,7 @@
        throw new Exception("ChangeNumber not available");
      }
    }
    }
    private ChangelogCursor() throws DatabaseException
    {
opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
@@ -81,6 +81,7 @@
  private boolean shutdown = false;
  private boolean done = false;
  private DirectoryThread thread = null;
  private Object flushLock = new Object();
  /**
   * Creates a New dbHandler associated to a given LDAP server.
@@ -204,6 +205,12 @@
  public ChangelogIterator generateIterator(ChangeNumber changeNumber)
                           throws DatabaseException, Exception
  {
    /*
     * make sure to flush some changes in the database so that
     * we don't create the iterator on an empty database when the
     * dbHandler has just been started.
     */
    flush();
    return new ChangelogIterator(serverId, db, changeNumber);
  }
@@ -320,7 +327,9 @@
      while ((size < 5000 ) &&  (!finished))
      {
        ChangeNumber changeNumber = cursor.nextChangeNumber();
        if ((changeNumber != null) && (!changeNumber.equals(lastChange))
        if (changeNumber != null)
        {
          if ((!changeNumber.equals(lastChange))
            && (changeNumber.older(trimDate)))
        {
          size++;
@@ -332,6 +341,9 @@
          finished = true;
        }
      }
        else
          finished = true;
      }
      cursor.close();
    } catch (DatabaseException e)
@@ -350,6 +362,8 @@
    do
    {
      synchronized(flushLock)
      {
      // get N messages to save in the DB
      List<UpdateMessage> changes = getChanges(500);
@@ -362,7 +376,7 @@
      // remove the changes from the list of changes to be saved.
      clear(changes.size());
      }
    } while (size >=500);
  }
@@ -387,19 +401,17 @@
      attributes.add(new Attribute("changelog-database",
                                   String.valueOf(serverId)));
      attributes.add(new Attribute("base-dn", baseDn.toString()));
      ChangeNumber first = getFirstChange();
      ChangeNumber last = getLastChange();
      if (first != null)
      if (firstChange != null)
      {
        Date firstTime = new Date(first.getTime());
        Date firstTime = new Date(firstChange.getTime());
        attributes.add(new Attribute("first-change",
            first.toString() + " " + firstTime.toString()));
            firstChange.toString() + " " + firstTime.toString()));
      }
      if (last != null)
      if (lastChange != null)
      {
        Date lastTime = new Date(last.getTime());
        Date lastTime = new Date(lastChange.getTime());
        attributes.add(new Attribute("last-change",
            last.toString() + " " + lastTime.toString()));
            lastChange.toString() + " " + lastTime.toString()));
      }
      return attributes;
opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -184,6 +184,7 @@
              InetAddress.getByName(hostname), Integer.parseInt(port));
          Socket socket = new Socket();
          socket.setReceiveBufferSize(1000000);
          socket.setTcpNoDelay(true);
          socket.connect(ServerAddr, 500);
          session = new SocketSession(socket);
opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -118,35 +118,7 @@
      return;
    savedStatus = true;
    ArrayList<ASN1OctetString> values = this.toASN1ArrayList();
    if (values.size() == 0)
      return;
    LDAPAttribute attr =
      new LDAPAttribute(SYNCHRONIZATION_STATE, values);
    LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
    ArrayList<LDAPModification> mods = new ArrayList<LDAPModification>(1);
    mods.add(mod);
    boolean done = false;
    while (!done)
    {
      /*
       * Generate a modify operation on the Server State Entry :
       * cn=ffffffff-ffffffff-ffffffff-ffffffff, baseDn
       */
      ModifyOperation op =
        new ModifyOperation(conn, InternalClientConnection.nextOperationID(),
            InternalClientConnection.nextMessageID(),
            new ArrayList<Control>(0), serverStateAsn1Dn,
            mods);
      op.setInternalOperation(true);
      op.setSynchronizationOperation(true);
      op.run();
      ResultCode resultCode = op.getResultCode();
    ResultCode resultCode = updateStateEntry();
      if (resultCode != ResultCode.SUCCESS)
      {
        if (resultCode == ResultCode.NO_SUCH_OBJECT)
@@ -156,20 +128,9 @@
        else
        {
          savedStatus = false;
          int msgID = MSGID_ERROR_UPDATING_RUV;
          String message = getMessage(msgID,
              op.getResultCode().getResultCodeName(),
              op.toString(), op.getErrorMessage(),
              baseDn.toString());
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.SEVERE_ERROR,
              message, msgID);
          break;
        }
      }
      else
        done = true;
    }
  }
  /**
@@ -297,6 +258,51 @@
  }
  /**
   * Save the current values of this PersistentState object
   * in the appropiate entry of the database.
   *
   * @return a ResultCode indicating if the method was successfull.
   */
  private ResultCode updateStateEntry()
  {
    /*
     * Generate a modify operation on the Server State Entry :
     * cn=ffffffff-ffffffff-ffffffff-ffffffff, baseDn
     */
    ArrayList<ASN1OctetString> values = this.toASN1ArrayList();
    if (values.size() == 0)
      return ResultCode.SUCCESS;
    LDAPAttribute attr =
      new LDAPAttribute(SYNCHRONIZATION_STATE, values);
    LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
    ArrayList<LDAPModification> mods = new ArrayList<LDAPModification>(1);
    mods.add(mod);
    ModifyOperation op =
      new ModifyOperation(conn, InternalClientConnection.nextOperationID(),
          InternalClientConnection.nextMessageID(),
          new ArrayList<Control>(0), serverStateAsn1Dn,
          mods);
    op.setInternalOperation(true);
    op.setSynchronizationOperation(true);
    op.run();
    ResultCode result = op.getResultCode();
    if (result != ResultCode.SUCCESS)
    {
      int msgID = MSGID_ERROR_UPDATING_RUV;
      String message = getMessage(msgID, op.getResultCode().getResultCodeName(),
          op.toString(), op.getErrorMessage(), baseDn.toString());
      logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
    }
    return result;
  }
  /**
   * Get the Dn where the ServerState is stored.
   * @return Returns the serverStateDn.
   */
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java
@@ -99,7 +99,7 @@
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    ChangelogBroker broker = openChangelogSession(baseDn, (short) 13,
        WINDOW_SIZE, 8989, 1000);
        WINDOW_SIZE, 8989, 1000, true);
    try {
      
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
@@ -112,7 +112,7 @@
    cleanEntries();
    ChangelogBroker broker =
      openChangelogSession(baseDn, (short) 18, 100, 8989, 5000);
      openChangelogSession(baseDn, (short) 18, 100, 8989, 5000, true);
    Monitor monitor = new Monitor("stress test monitor");
    DirectoryServer.registerMonitorProvider(monitor);
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -41,6 +41,7 @@
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.plugin.ChangelogBroker;
import org.opends.server.synchronization.plugin.MultimasterSynchronization;
import org.opends.server.synchronization.plugin.PersistentServerState;
@@ -100,6 +101,7 @@
  {
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    schemaCheck = DirectoryServer.checkSchema();
    
    // Create an internal connection
    connection = new InternalClientConnection();
@@ -110,10 +112,12 @@
   *
   */
  protected ChangelogBroker openChangelogSession(
      final DN baseDn, short serverId, int window_size, int port, int timeout)
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, boolean emptyOldChanges)
          throws Exception, SocketException
  {
    PersistentServerState state = new PersistentServerState(baseDn);
    if (emptyOldChanges)
    state.loadState();
    ChangelogBroker broker = new ChangelogBroker(
        state, baseDn, serverId, 0, 0, 0, 0, window_size);
@@ -122,6 +126,8 @@
    broker.start(servers);
    if (timeout != 0)
      broker.setSoTimeout(timeout);
    if (emptyOldChanges)
    {
    /*
     * loop receiving update until there is nothing left
     * to make sure that message from previous tests have been consumed.
@@ -135,6 +141,27 @@
    }
    catch (Exception e)
    { }
    }
    return broker;
  }
  /**
   * Open a new session to the Changelog Server
   * starting with a given ServerState.
   */
  protected ChangelogBroker openChangelogSession(
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, ServerState state)
          throws Exception, SocketException
  {
    ChangelogBroker broker = new ChangelogBroker(
        state, baseDn, serverId, 0, 0, 0, 0, window_size);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
    broker.start(servers);
    if (timeout != 0)
      broker.setSoTimeout(timeout);
    return broker;
  }
  
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -254,7 +254,7 @@
     * This must use a serverId different from the LDAP server ID
     */
    ChangelogBroker broker =
      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000);
      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true);
    /*
     * Create a Change number generator to generate new changenumbers
@@ -660,7 +660,7 @@
    cleanEntries();
    ChangelogBroker broker =
      openChangelogSession(baseDn, (short) 27, 100, 8989, 1000);
      openChangelogSession(baseDn, (short) 27, 100, 8989, 1000, true);
    try {
      ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 27, 0);
@@ -1062,7 +1062,7 @@
    Thread.sleep(2000);
    ChangelogBroker broker =
      openChangelogSession(baseDn, (short) 11, 100, 8989, 1000);
      openChangelogSession(baseDn, (short) 11, 100, 8989, 1000, true);
    try
    {
      ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 11, 0);
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -30,33 +30,56 @@
import org.opends.server.TestCaseUtils;
import org.opends.server.config.ConfigEntry;
import org.opends.server.core.DirectoryServer;
import org.opends.server.synchronization.SynchronizationTestCase;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ChangeNumberGenerator;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.plugin.ChangelogBroker;
import org.opends.server.synchronization.protocol.DeleteMsg;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.util.TimeThread;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
/**
 * Tests for the chngelog service code.
 * Tests for the changelog service code.
 */
public class ChangelogTest extends SynchronizationTestCase
{
  /**
   * Basic test of the changelog code.
   * Create a changelog server, connect 2 clients and exchange
   * messages between the clients.
   * The changelog server that will be used in this test.
   */
  @Test()
  public void changelogBasic() throws Exception
  private Changelog changelog = null;
  /**
   * The port of the changelog server.
   */
  private int changelogPort;
  private ChangeNumber firstChangeNumberServer1 = null;
  private ChangeNumber secondChangeNumberServer1 = null;
  private ChangeNumber firstChangeNumberServer2 = null;
  private ChangeNumber secondChangeNumberServer2 = null;
  /**
   * Before starting the tests, start the server and configure a
   * changelog server.
   */
  @BeforeClass()
  public void configure() throws Exception
  {
    // find  a free port
    TestCaseUtils.startServer();
    //  find  a free port for the changelog server
    ServerSocket socket = TestCaseUtils.bindFreePort();
    int changelogPort = socket.getLocalPort();
    changelogPort = socket.getLocalPort();
    socket.close();
    String changelogLdif =
@@ -65,39 +88,363 @@
        + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
        + "cn: Changelog Server\n"
        + "ds-cfg-changelog-port: "+ changelogPort + "\n"
        + "ds-cfg-changelog-server-id: 1\n";
        + "ds-cfg-changelog-server-id: 1\n"
        + "ds-cfg-window-size: 100";
    Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
    ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
    Changelog changelog = new Changelog(changelogConfig);
    changelog = new Changelog(changelogConfig);
  }
    ChangelogBroker broker1 = null;
    ChangelogBroker broker2 = null;
  /**
   * Basic test of the changelog code :
   *  Connect 2 clients to the changelog server and exchange messages
   *  between the clients.
   *
   * Note : Other tests in this file depends on this test and may need to
   *        change if this test is modified.
   */
  @Test()
  public void changelogBasic() throws Exception
  {
    ChangelogBroker server1 = null;
    ChangelogBroker server2 = null;
    
    try {
      broker1 = openChangelogSession(
          DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort, 1000);
      broker2 = openChangelogSession(
          DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort, 1000);
      /*
       * Open a sender session and a receiver session to the changelog
       */
      server1 = openChangelogSession(
          DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort,
          1000, true);
      server2 = openChangelogSession(
          DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort,
          1000, true);
      ChangeNumber cn = new ChangeNumber((long) 1, 1, (short)1);
      DeleteMsg msg = new DeleteMsg("o=test,dc=example,dc=com", cn, "uid");
      broker1.publish(msg);
      SynchronizationMessage msg2 = broker2.receive();
      /*
       * Create change numbers for the messages sent from server 1
       * with current time  sequence 1 and with current time + 2 sequence 2
       */
      long time = TimeThread.getTime();
      firstChangeNumberServer1 = new ChangeNumber(time, 1, (short) 1);
      secondChangeNumberServer1 = new ChangeNumber(time + 2, 2, (short) 1);
      /*
       * Create change numbers for the messages sent from server 2
       * with current time  sequence 1 and with current time + 3 sequence 2
       */
      firstChangeNumberServer2 = new ChangeNumber(time+ 1, 1, (short) 2);
      secondChangeNumberServer2 = new ChangeNumber(time + 3, 2, (short) 2);
      /*
       * Send and receive a Delete Msg from server 1 to server 2
       */
      DeleteMsg msg =
        new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer1,
                      "uid");
      server1.publish(msg);
      SynchronizationMessage msg2 = server2.receive();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.toString().equals(msg2.toString()));
        assertTrue(del.toString().equals(msg.toString()),
            "Changelog basic : incorrect message body received.");
      }
      else
        fail("Changelog transmission failed");
        fail("Changelog basic : incorrect message type received.");
      /*
       * Send and receive a second Delete Msg
       */
      msg = new DeleteMsg("o=test", secondChangeNumberServer1, "uid");
      server1.publish(msg);
      msg2 = server2.receive();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.toString().equals(msg.toString()),
            "Changelog basic : incorrect message body received.");
      }
      else
        fail("Changelog basic : incorrect message type received.");
      /*
       * Send and receive a Delete Msg from server 1 to server 2
       */
      msg =
        new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer2,
                      "other-uid");
      server2.publish(msg);
      msg2 = server1.receive();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.toString().equals(msg.toString()),
            "Changelog basic : incorrect message body received.");
      }
      else
        fail("Changelog basic : incorrect message type received.");
      /*
       * Send and receive a second Delete Msg
       */
      msg = new DeleteMsg("o=test", secondChangeNumberServer2, "uid");
      server2.publish(msg);
      msg2 = server1.receive();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.toString().equals(msg.toString()),
            "Changelog basic : incorrect message body received.");
      }
      else
        fail("Changelog basic : incorrect message type received.");
    }
    finally
    {
      if (server1 != null)
        server1.stop();
      if (server2 != null)
        server2.stop();
    }
  }
  /**
   * Test that a new client see the change that was sent in the
   * previous test.
   */
  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
  public void newClient() throws Exception
  {
    ChangelogBroker broker = null;
    try {
      broker =
        openChangelogSession(DN.decode("dc=example,dc=com"), (short) 3,
                             100, changelogPort, 1000, false);
      SynchronizationMessage msg2 = broker.receive();
      if (!(msg2 instanceof DeleteMsg))
        fail("Changelog basic transmission failed");
      else
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.getChangeNumber().equals(firstChangeNumberServer1),
            "The first message received by a new client was the wrong one."
            + del.getChangeNumber() + " " + firstChangeNumberServer1);
      }
    }
    finally
    {
      if (broker != null)
        broker.stop();
    }
  }
  /**
   * Test that a client that has already seen some changes now receive
   * the correct next change.
   */
  private void newClientWithChanges(
      ServerState state, ChangeNumber nextChangeNumber) throws Exception
  {
    ChangelogBroker broker = null;
    /*
     * Connect to the changelog server using the state created above.
     */
    try {
      broker =
        openChangelogSession(DN.decode("dc=example,dc=com"), (short) 3,
                             100, changelogPort, 1000, state);
      SynchronizationMessage msg2 = broker.receive();
      if (!(msg2 instanceof DeleteMsg))
        fail("Changelog basic transmission failed");
      else
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.getChangeNumber().equals(nextChangeNumber),
            "The second message received by a new client was the wrong one."
            + del.getChangeNumber() + " " + nextChangeNumber);
      }
    }
    finally
    {
      if (broker != null)
        broker.stop();
    }
  }
  /**
   * Test that a client that has already seen the first change now see the
   * second change
   */
  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
  public void newClientWithFirstChanges() throws Exception
  {
    /*
     * Create a ServerState updated with the first changes from both servers
     * done in test changelogBasic.
     */
    ServerState state = new ServerState();
    state.update(firstChangeNumberServer1);
    state.update(firstChangeNumberServer2);
    newClientWithChanges(state, secondChangeNumberServer1);
  }
  /**
   * Test that a client that has already seen the first change from server 1
   * now see the first change from server 2
   */
  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
  public void newClientWithChangefromServer1() throws Exception
  {
    /*
     * Create a ServerState updated with the first change from server 1
     */
    ServerState state = new ServerState();
    state.update(firstChangeNumberServer1);
    newClientWithChanges(state, firstChangeNumberServer2);
  }
  /**
   * Test that a client that has already seen the first chaneg from server 2
   * now see the first change from server 1
   */
  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
  public void newClientWithChangefromServer2() throws Exception
  {
    /*
     * Create a ServerState updated with the first change from server 1
     */
    ServerState state = new ServerState();
    state.update(firstChangeNumberServer2);
    newClientWithChanges(state, firstChangeNumberServer1);
  }
  /**
   * Test that a client that has not seen the second change from server 1
   * now receive it.
   */
  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
  public void newClientLateServer1() throws Exception
  {
    /*
     * Create a ServerState updated with the first change from server 1
     */
    ServerState state = new ServerState();
    state.update(secondChangeNumberServer2);
    state.update(firstChangeNumberServer1);
    newClientWithChanges(state, secondChangeNumberServer1);
  }
  /**
   * Test that newClient() and newClientWithFirstChange() still works
   * after stopping and restarting the changelog server.
   */
  @Test(enabled=false, dependsOnMethods = { "changelogBasic" })
  public void stopChangelog() throws Exception
  {
      changelog.shutdown();
      if (broker1 != null)
        broker1.stop();
      if (broker2 != null)
        broker2.stop();
    configure();
    newClient();
    newClientWithFirstChanges();
    newClientWithChangefromServer1();
    newClientWithChangefromServer2();
  }
  /**
   * Stress test from client using the ChangelogBroker API
   * to the changelog server.
   */
  @Test(enabled=false, groups="slow")
  public void stressFromBrokertoChangelog() throws Exception
  {
    ChangelogBroker server = null;
    try
    {
      /*
       * Open a sender session
       */
      server = openChangelogSession(
          DN.decode("dc=example,dc=com"), (short) 5, 100, changelogPort,
          1000, true);
      BrokerReader reader = new BrokerReader(server);
      reader.start();
      ChangeNumberGenerator gen =
        new ChangeNumberGenerator((short)5 , (long) 0);
      /*
       * Simple loop creating changes and sending them
       * to the changelog server.
       */
      for (int i = 0; i< 100000; i++)
      {
        DeleteMsg msg =
          new DeleteMsg("o=test,dc=example,dc=com", gen.NewChangeNumber(),
          "uid");
        server.publish(msg);
      }
    }
    finally
    {
      if (server != null)
        server.stop();
    }
  }
  /**
   * After the tests stop the changelog server.
   */
  @AfterClass()
  public void shutdown() throws Exception
  {
    if (changelog != null)
      changelog.shutdown();
  }
  /**
   * Continuously reads messages from a changelog broker until there is nothing
   * left. Count the number of received messages.
   */
  private class BrokerReader extends Thread
  {
    private ChangelogBroker broker;
    /**
     * Creates a new Stress Test Reader
     * @param broker
     */
    public BrokerReader(ChangelogBroker broker)
    {
      this.broker = broker;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void run()
    {
      // loop receiving messages until either we get a timeout
      // because there is nothing left or an error condition happens.
      try
      {
        while (true)
        {
          SynchronizationMessage msg = broker.receive();
          if (msg == null)
            break;
        }
      } catch (Exception e) {
      }
    }
  }
}