mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
16.10.2007 355271b680fd3e08e34900812520d7c355e538d3
This change includes a set of new unit tests dedicated to the startup phase
of the synchronization server (former changelog server) and the connection
phase of client LDAP server to this synchronization server.
These tests cover various cases of this connection checking that the adequate
changes are then sent to the client.

This change also include the fix for :
issue 1155 : synchronization server miss one change when a new LDAP server is added
to the topology
9 files modified
587 ■■■■ changed files
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java 21 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java 62 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java 100 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java 70 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java 6 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java 321 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -418,7 +418,7 @@
      ServerHandler handler = new ServerHandler(
                                      new SocketSession(socket), queueSize);
      handler.start(baseDn, serverId, serverURL, rcvWindow, this);
     handler.start(baseDn, serverId, this.serverURL, rcvWindow, this);
    }
    catch (IOException e)
    {
@@ -545,6 +545,7 @@
    }
    dbEnv.shutdown();
    DirectoryServer.deregisterConfigurableComponent(this);
  }
opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java
@@ -170,10 +170,6 @@
  public ChangelogCursor openReadCursor(ChangeNumber changeNumber)
                throws DatabaseException, Exception
  {
    if (changeNumber == null)
      changeNumber = readFirstChange();
    if (changeNumber == null)
      return null;
    return new ChangelogCursor(changeNumber);
  }
@@ -319,13 +315,16 @@
    {
      cursor = db.openCursor(txn, null);
      DatabaseEntry key = new ChangelogKey(startingChangeNumber);
      DatabaseEntry data = new DatabaseEntry();
      if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
        OperationStatus.SUCCESS)
      if (startingChangeNumber != null)
      {
        throw new Exception("ChangeNumber not available");
        key = new ChangelogKey(startingChangeNumber);
        data = new DatabaseEntry();
        if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
          OperationStatus.SUCCESS)
        {
          throw new Exception("ChangeNumber not available");
        }
      }
    }
@@ -373,7 +372,7 @@
    }
    /**
     * Get the next ChangeNumber inthe database from this Cursor.
     * Get the next ChangeNumber in the database from this Cursor.
     *
     * @return The next ChangeNumber in the database from this cursor.
     * @throws DatabaseException In case of underlying database problem.
opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
@@ -81,6 +81,7 @@
  private boolean shutdown = false;
  private boolean done = false;
  private DirectoryThread thread = null;
  private Object flushLock = new Object();
  /**
   * Creates a New dbHandler associated to a given LDAP server.
@@ -204,6 +205,12 @@
  public ChangelogIterator generateIterator(ChangeNumber changeNumber)
                           throws DatabaseException, Exception
  {
    /*
     * make sure to flush some changes in the database so that
     * we don't create the iterator on an empty database when the
     * dbHandler has just been started.
     */
    flush();
    return new ChangelogIterator(serverId, db, changeNumber);
  }
@@ -320,17 +327,22 @@
      while ((size < 5000 ) &&  (!finished))
      {
        ChangeNumber changeNumber = cursor.nextChangeNumber();
        if ((changeNumber != null) && (!changeNumber.equals(lastChange))
            && (changeNumber.older(trimDate)))
        if (changeNumber != null)
        {
          size++;
          cursor.delete();
          if ((!changeNumber.equals(lastChange))
              && (changeNumber.older(trimDate)))
          {
            size++;
            cursor.delete();
          }
          else
          {
            firstChange = changeNumber;
            finished = true;
          }
        }
        else
        {
          firstChange = changeNumber;
          finished = true;
        }
      }
      cursor.close();
@@ -350,19 +362,21 @@
    do
    {
      // get N messages to save in the DB
      List<UpdateMessage> changes = getChanges(500);
      synchronized(flushLock)
      {
        // get N messages to save in the DB
        List<UpdateMessage> changes = getChanges(500);
      // if no more changes to save exit immediately.
      if ((changes == null) || ((size = changes.size()) == 0))
        return;
        // if no more changes to save exit immediately.
        if ((changes == null) || ((size = changes.size()) == 0))
          return;
      // save the change to the stable storage.
      db.addEntries(changes);
        // save the change to the stable storage.
        db.addEntries(changes);
      // remove the changes from the list of changes to be saved.
      clear(changes.size());
        // remove the changes from the list of changes to be saved.
        clear(changes.size());
      }
    } while (size >=500);
  }
@@ -387,19 +401,17 @@
      attributes.add(new Attribute("changelog-database",
                                   String.valueOf(serverId)));
      attributes.add(new Attribute("base-dn", baseDn.toString()));
      ChangeNumber first = getFirstChange();
      ChangeNumber last = getLastChange();
      if (first != null)
      if (firstChange != null)
      {
        Date firstTime = new Date(first.getTime());
        Date firstTime = new Date(firstChange.getTime());
        attributes.add(new Attribute("first-change",
            first.toString() + " " + firstTime.toString()));
            firstChange.toString() + " " + firstTime.toString()));
      }
      if (last != null)
      if (lastChange != null)
      {
        Date lastTime = new Date(last.getTime());
        Date lastTime = new Date(lastChange.getTime());
        attributes.add(new Attribute("last-change",
            last.toString() + " " + lastTime.toString()));
            lastChange.toString() + " " + lastTime.toString()));
      }
      return attributes;
opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -118,57 +118,18 @@
      return;
    savedStatus = true;
    ArrayList<ASN1OctetString> values = this.toASN1ArrayList();
    if (values.size() == 0)
      return;
    LDAPAttribute attr =
      new LDAPAttribute(SYNCHRONIZATION_STATE, values);
    LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
    ArrayList<LDAPModification> mods = new ArrayList<LDAPModification>(1);
    mods.add(mod);
    boolean done = false;
    while (!done)
    ResultCode resultCode = updateStateEntry();
    if (resultCode != ResultCode.SUCCESS)
    {
      /*
       * Generate a modify operation on the Server State Entry :
       * cn=ffffffff-ffffffff-ffffffff-ffffffff, baseDn
       */
      ModifyOperation op =
        new ModifyOperation(conn, InternalClientConnection.nextOperationID(),
            InternalClientConnection.nextMessageID(),
            new ArrayList<Control>(0), serverStateAsn1Dn,
            mods);
      op.setInternalOperation(true);
      op.setSynchronizationOperation(true);
      op.run();
      ResultCode resultCode = op.getResultCode();
      if (resultCode != ResultCode.SUCCESS)
      if (resultCode == ResultCode.NO_SUCH_OBJECT)
      {
        if (resultCode == ResultCode.NO_SUCH_OBJECT)
        {
          createStateEntry();
        }
        else
        {
          savedStatus = false;
          int msgID = MSGID_ERROR_UPDATING_RUV;
          String message = getMessage(msgID,
              op.getResultCode().getResultCodeName(),
              op.toString(), op.getErrorMessage(),
              baseDn.toString());
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.SEVERE_ERROR,
              message, msgID);
          break;
        }
        createStateEntry();
      }
      else
        done = true;
      {
        savedStatus = false;
      }
    }
  }
@@ -297,6 +258,51 @@
  }
  /**
   * Save the current values of this PersistentState object
   * in the appropiate entry of the database.
   *
   * @return a ResultCode indicating if the method was successfull.
   */
  private ResultCode updateStateEntry()
  {
    /*
     * Generate a modify operation on the Server State Entry :
     * cn=ffffffff-ffffffff-ffffffff-ffffffff, baseDn
     */
    ArrayList<ASN1OctetString> values = this.toASN1ArrayList();
    if (values.size() == 0)
      return ResultCode.SUCCESS;
    LDAPAttribute attr =
      new LDAPAttribute(SYNCHRONIZATION_STATE, values);
    LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
    ArrayList<LDAPModification> mods = new ArrayList<LDAPModification>(1);
    mods.add(mod);
    ModifyOperation op =
      new ModifyOperation(conn, InternalClientConnection.nextOperationID(),
          InternalClientConnection.nextMessageID(),
          new ArrayList<Control>(0), serverStateAsn1Dn,
          mods);
    op.setInternalOperation(true);
    op.setSynchronizationOperation(true);
    op.run();
    ResultCode result = op.getResultCode();
    if (result != ResultCode.SUCCESS)
    {
      int msgID = MSGID_ERROR_UPDATING_RUV;
      String message = getMessage(msgID, op.getResultCode().getResultCodeName(),
          op.toString(), op.getErrorMessage(), baseDn.toString());
      logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
    }
    return result;
  }
  /**
   * Get the Dn where the ServerState is stored.
   * @return Returns the serverStateDn.
   */
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java
@@ -99,7 +99,7 @@
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    ChangelogBroker broker = openChangelogSession(baseDn, (short) 13,
        WINDOW_SIZE, 8989, 1000);
        WINDOW_SIZE, 8989, 1000, true);
    try {
      
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
@@ -112,7 +112,7 @@
    cleanEntries();
    ChangelogBroker broker =
      openChangelogSession(baseDn, (short) 18, 100, 8989, 5000);
      openChangelogSession(baseDn, (short) 18, 100, 8989, 5000, true);
    Monitor monitor = new Monitor("stress test monitor");
    DirectoryServer.registerMonitorProvider(monitor);
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -41,6 +41,7 @@
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.plugin.ChangelogBroker;
import org.opends.server.synchronization.plugin.MultimasterSynchronization;
import org.opends.server.synchronization.plugin.PersistentServerState;
@@ -52,7 +53,7 @@
import org.testng.annotations.BeforeClass;
/**
 * An abstract class that all synchronization unit test should extend.
 * An abstract class that all synchronization unit test should extend.
 */
@Test(groups = { "precommit", "synchronization" })
public abstract class SynchronizationTestCase extends DirectoryServerTestCase
@@ -91,7 +92,7 @@
  /**
   * Set up the environment for performing the tests in this suite.
   *
   *
   * @throws Exception
   *         If the environment could not be set up.
   */
@@ -100,21 +101,23 @@
  {
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    // Create an internal connection
    connection = new InternalClientConnection();
  }
  /**
   * Open a changelog session to the local Changelog server.
   *
   */
  protected ChangelogBroker openChangelogSession(
      final DN baseDn, short serverId, int window_size, int port, int timeout)
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, boolean emptyOldChanges)
          throws Exception, SocketException
  {
    PersistentServerState state = new PersistentServerState(baseDn);
    state.loadState();
    if (emptyOldChanges)
      state.loadState();
    ChangelogBroker broker = new ChangelogBroker(
        state, baseDn, serverId, 0, 0, 0, 0, window_size);
    ArrayList<String> servers = new ArrayList<String>(1);
@@ -122,23 +125,46 @@
    broker.start(servers);
    if (timeout != 0)
      broker.setSoTimeout(timeout);
    /*
     * loop receiving update until there is nothing left
     * to make sure that message from previous tests have been consumed.
     */
    try
    if (emptyOldChanges)
    {
      while (true)
      /*
       * loop receiving update until there is nothing left
       * to make sure that message from previous tests have been consumed.
       */
      try
      {
        broker.receive();
        while (true)
        {
          broker.receive();
        }
      }
      catch (Exception e)
      { }
    }
    catch (Exception e)
    { }
    return broker;
  }
  
  /**
   * Open a new session to the Changelog Server
   * starting with a given ServerState.
   */
  protected ChangelogBroker openChangelogSession(
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, ServerState state)
          throws Exception, SocketException
  {
    ChangelogBroker broker = new ChangelogBroker(
        state, baseDn, serverId, 0, 0, 0, 0, window_size);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
    broker.start(servers);
    if (timeout != 0)
      broker.setSoTimeout(timeout);
    return broker;
  }
  /**
   * suppress all the entries created by the tests in this class
   */
  protected void cleanEntries()
@@ -149,11 +175,11 @@
    {
      while (true)
      {
        DN dn = entryList.removeLast();
        DN dn = entryList.removeLast();
        op = new DeleteOperation(connection, InternalClientConnection
            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
            dn);
        op.run();;
      }
    }
@@ -172,7 +198,7 @@
  public void classCleanUp() throws Exception
  {
    DirectoryServer.setCheckSchema(schemaCheck);
    // WORKAROUND FOR BUG #639 - BEGIN -
    if (mms != null)
    {
@@ -180,7 +206,7 @@
      mms.finalizeSynchronizationProvider();
    }
    // WORKAROUND FOR BUG #639 - END -
    cleanEntries();
  }
@@ -196,7 +222,7 @@
    assertNotNull(DirectoryServer.getConfigEntry(DN
        .decode(synchroPluginStringDN)),
        "Unable to add the Multimaster synchronization plugin");
    // WORKAROUND FOR BUG #639 - BEGIN -
    DN dn = DN.decode(synchroPluginStringDN);
    ConfigEntry mmsConfigEntry = DirectoryServer.getConfigEntry(dn);
@@ -212,14 +238,14 @@
    }
    DirectoryServer.registerSynchronizationProvider(mms);
    // WORKAROUND FOR BUG #639 - END -
    //
    // Add the changelog server
    DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
        "Unable to add the changeLog server");
    entryList.add(changeLogEntry.getDN());
    //
    // We also have a replicated suffix (synchronization domain)
    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -249,7 +249,7 @@
     * This must use a serverId different from the LDAP server ID
     */
    ChangelogBroker broker =
      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000);
      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true);
    /*
     * Create a Change number generator to generate new changenumbers
@@ -562,7 +562,7 @@
    cleanEntries();
    ChangelogBroker broker =
      openChangelogSession(baseDn, (short) 27, 100, 8989, 1000);
      openChangelogSession(baseDn, (short) 27, 100, 8989, 1000, true);
    try {
      ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 27, 0);
@@ -964,7 +964,7 @@
    Thread.sleep(2000);
    ChangelogBroker broker =
      openChangelogSession(baseDn, (short) 11, 100, 8989, 1000);
      openChangelogSession(baseDn, (short) 11, 100, 8989, 1000, true);
    try
    {
      ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 11, 0);
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -32,31 +32,52 @@
import org.opends.server.config.ConfigEntry;
import org.opends.server.synchronization.SynchronizationTestCase;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.plugin.ChangelogBroker;
import org.opends.server.synchronization.protocol.DeleteMsg;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.util.TimeThread;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
/**
 * Tests for the chngelog service code.
 * Tests for the changelog service code.
 */
public class ChangelogTest extends SynchronizationTestCase
{
  /**
   * Basic test of the changelog code.
   * Create a changelog server, connect 2 clients and exchange
   * messages between the clients.
   * The changelog server that will be used in this test.
   */
  @Test()
  public void changelogBasic() throws Exception
  private Changelog changelog = null;
  /**
   * The port of the changelog server.
   */
  private int changelogPort;
  private ChangeNumber firstChangeNumberServer1 = null;
  private ChangeNumber secondChangeNumberServer1 = null;
  private ChangeNumber firstChangeNumberServer2 = null;
  private ChangeNumber secondChangeNumberServer2 = null;
  /**
   * Before starting the tests, start the server and configure a
   * changelog server.
   */
  @BeforeClass()
  public void configure() throws Exception
  {
    // find  a free port
    TestCaseUtils.startServer();
    //  find  a free port for the changelog server
    ServerSocket socket = TestCaseUtils.bindFreePort();
    int changelogPort = socket.getLocalPort();
    changelogPort = socket.getLocalPort();
    socket.close();
    String changelogLdif =
@@ -68,36 +89,280 @@
        + "ds-cfg-changelog-server-id: 1\n";
    Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
    ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
    Changelog changelog = new Changelog(changelogConfig);
    changelog = new Changelog(changelogConfig);
  }
    ChangelogBroker broker1 = null;
    ChangelogBroker broker2 = null;
  /**
   * Basic test of the changelog code :
   *  Connect 2 clients to the changelog server and exchange messages
   *  between the clients.
   *
   * Note : Other tests in this file depends on this test and may need to
   *        change if this test is modified.
   */
  @Test()
  public void changelogBasic() throws Exception
  {
    ChangelogBroker server1 = null;
    ChangelogBroker server2 = null;
    try {
      broker1 = openChangelogSession(
          DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort, 1000);
      broker2 = openChangelogSession(
          DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort, 1000);
      ChangeNumber cn = new ChangeNumber((long) 1, 1, (short)1);
      DeleteMsg msg = new DeleteMsg("o=test,dc=example,dc=com", cn, "uid");
      broker1.publish(msg);
      SynchronizationMessage msg2 = broker2.receive();
      /*
       * Open a sender session and a receiver session to the changelog
       */
      server1 = openChangelogSession(
          DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort,
          1000, true);
      server2 = openChangelogSession(
          DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort,
          1000, true);
      /*
       * Create change numbers for the messages sent from server 1
       * with current time  sequence 1 and with current time + 2 sequence 2
       */
      long time = TimeThread.getTime();
      firstChangeNumberServer1 = new ChangeNumber(time, 1, (short) 1);
      secondChangeNumberServer1 = new ChangeNumber(time + 2, 2, (short) 1);
      /*
       * Create change numbers for the messages sent from server 2
       * with current time  sequence 1 and with current time + 3 sequence 2
       */
      firstChangeNumberServer2 = new ChangeNumber(time+ 1, 1, (short) 2);
      secondChangeNumberServer2 = new ChangeNumber(time + 3, 2, (short) 2);
      /*
       * Send and receive a Delete Msg from server 1 to server 2
       */
      DeleteMsg msg =
        new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer1,
                      "uid");
      server1.publish(msg);
      SynchronizationMessage msg2 = server2.receive();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.toString().equals(msg2.toString()));
        assertTrue(del.toString().equals(msg.toString()),
            "Changelog basic : incorrect message body received.");
      }
      else
        fail("Changelog transmission failed");
        fail("Changelog basic : incorrect message type received.");
      /*
       * Send and receive a second Delete Msg
       */
      msg = new DeleteMsg("o=test", secondChangeNumberServer1, "uid");
      server1.publish(msg);
      msg2 = server2.receive();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.toString().equals(msg.toString()),
            "Changelog basic : incorrect message body received.");
      }
      else
        fail("Changelog basic : incorrect message type received.");
      /*
       * Send and receive a Delete Msg from server 1 to server 2
       */
      msg =
        new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer2,
                      "other-uid");
      server2.publish(msg);
      msg2 = server1.receive();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.toString().equals(msg.toString()),
            "Changelog basic : incorrect message body received.");
      }
      else
        fail("Changelog basic : incorrect message type received.");
      /*
       * Send and receive a second Delete Msg
       */
      msg = new DeleteMsg("o=test", secondChangeNumberServer2, "uid");
      server2.publish(msg);
      msg2 = server1.receive();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.toString().equals(msg.toString()),
            "Changelog basic : incorrect message body received.");
      }
      else
        fail("Changelog basic : incorrect message type received.");
    }
    finally
    {
      changelog.shutdown();
      if (broker1 != null)
        broker1.stop();
      if (broker2 != null)
        broker2.stop();
      if (server1 != null)
        server1.stop();
      if (server2 != null)
        server2.stop();
    }
  }
  /**
   * Test that a new client see the change that was sent in the
   * previous test.
   */
  @Test(dependsOnMethods = { "changelogBasic" })
  public void newClient() throws Exception
  {
    ChangelogBroker broker = null;
    try {
      broker =
        openChangelogSession(DN.decode("dc=example,dc=com"), (short) 3,
                             100, changelogPort, 1000, false);
      SynchronizationMessage msg2 = broker.receive();
      if (!(msg2 instanceof DeleteMsg))
        fail("Changelog basic transmission failed");
      else
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.getChangeNumber().equals(firstChangeNumberServer1),
            "The first message received by a new client was the wrong one."
            + del.getChangeNumber() + " " + firstChangeNumberServer1);
      }
    }
    finally
    {
      if (broker != null)
        broker.stop();
    }
  }
  /**
   * Test that a client that has already seen some changes now receive
   * the correct next change.
   */
  private void newClientWithChanges(
      ServerState state, ChangeNumber nextChangeNumber) throws Exception
  {
    ChangelogBroker broker = null;
    /*
     * Connect to the changelog server using the state created above.
     */
    try {
      broker =
        openChangelogSession(DN.decode("dc=example,dc=com"), (short) 3,
                             100, changelogPort, 1000, state);
      SynchronizationMessage msg2 = broker.receive();
      if (!(msg2 instanceof DeleteMsg))
        fail("Changelog basic transmission failed");
      else
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.getChangeNumber().equals(nextChangeNumber),
            "The second message received by a new client was the wrong one."
            + del.getChangeNumber() + " " + nextChangeNumber);
      }
    }
    finally
    {
      if (broker != null)
        broker.stop();
    }
  }
  /**
   * Test that a client that has already seen the first change now see the
   * second change
   */
  @Test(dependsOnMethods = { "changelogBasic" })
  public void newClientWithFirstChanges() throws Exception
  {
    /*
     * Create a ServerState updated with the first changes from both servers
     * done in test changelogBasic.
     */
    ServerState state = new ServerState();
    state.update(firstChangeNumberServer1);
    state.update(firstChangeNumberServer2);
    newClientWithChanges(state, secondChangeNumberServer1);
  }
  /**
   * Test that a client that has already seen the first change from server 1
   * now see the first change from server 2
   */
  @Test(dependsOnMethods = { "changelogBasic" })
  public void newClientWithChangefromServer1() throws Exception
  {
    /*
     * Create a ServerState updated with the first change from server 1
     */
    ServerState state = new ServerState();
    state.update(firstChangeNumberServer1);
    newClientWithChanges(state, firstChangeNumberServer2);
  }
  /**
   * Test that a client that has already seen the first chaneg from server 2
   * now see the first change from server 1
   */
  @Test(dependsOnMethods = { "changelogBasic" })
  public void newClientWithChangefromServer2() throws Exception
  {
    /*
     * Create a ServerState updated with the first change from server 1
     */
    ServerState state = new ServerState();
    state.update(firstChangeNumberServer2);
    newClientWithChanges(state, firstChangeNumberServer1);
  }
  /**
   * Test that a client that has not seen the second change from server 1
   * now receive it.
   */
  @Test(dependsOnMethods = { "changelogBasic" })
  public void newClientLateServer1() throws Exception
  {
    /*
     * Create a ServerState updated with the first change from server 1
     */
    ServerState state = new ServerState();
    state.update(secondChangeNumberServer2);
    state.update(firstChangeNumberServer1);
    newClientWithChanges(state, secondChangeNumberServer1);
  }
  /**
   * Test that newClient() and newClientWithFirstChange() still works
   * after stopping and restarting the changelog server.
   */
  @Test(dependsOnMethods = { "changelogBasic" })
  public void stopChangelog() throws Exception
  {
    changelog.shutdown();
    configure();
    newClient();
    newClientWithFirstChanges();
    newClientWithChangefromServer1();
    newClientWithChangefromServer2();
  }
  /**
   * After the tests stop the changelog server.
   */
  @AfterClass()
  public void shutdown() throws Exception
  {
    if (changelog != null)
      changelog.shutdown();
  }
}