mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
18.20.2007 aefad4e6cfa33be9eca8cda45861d98236d0faed
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -340,6 +340,7 @@
      {
        newSocket =  listenSocket.accept();
        newSocket.setReceiveBufferSize(1000000);
        newSocket.setTcpNoDelay(true);
        ServerHandler handler = new ServerHandler(
                                     new SocketSession(newSocket), queueSize);
        handler.start(null, serverId, serverURL, rcvWindow, this);
@@ -414,11 +415,12 @@
                     InetAddress.getByName(hostname), Integer.parseInt(port));
      Socket socket = new Socket();
      socket.setReceiveBufferSize(1000000);
      socket.setTcpNoDelay(true);
      socket.connect(ServerAddr, 500);
      ServerHandler handler = new ServerHandler(
                                      new SocketSession(socket), queueSize);
      handler.start(baseDn, serverId, serverURL, rcvWindow, this);
     handler.start(baseDn, serverId, this.serverURL, rcvWindow, this);
    }
    catch (IOException e)
    {
@@ -545,6 +547,7 @@
    }
    dbEnv.shutdown();
    DirectoryServer.deregisterConfigurableComponent(this);
  }
opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java
@@ -170,10 +170,6 @@
  public ChangelogCursor openReadCursor(ChangeNumber changeNumber)
                throws DatabaseException, Exception
  {
    if (changeNumber == null)
      changeNumber = readFirstChange();
    if (changeNumber == null)
      return null;
    return new ChangelogCursor(changeNumber);
  }
@@ -319,13 +315,16 @@
    {
      cursor = db.openCursor(txn, null);
      DatabaseEntry key = new ChangelogKey(startingChangeNumber);
      DatabaseEntry data = new DatabaseEntry();
      if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
        OperationStatus.SUCCESS)
      if (startingChangeNumber != null)
      {
        throw new Exception("ChangeNumber not available");
        key = new ChangelogKey(startingChangeNumber);
        data = new DatabaseEntry();
        if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
          OperationStatus.SUCCESS)
        {
          throw new Exception("ChangeNumber not available");
        }
      }
    }
@@ -373,7 +372,7 @@
    }
    /**
     * Get the next ChangeNumber inthe database from this Cursor.
     * Get the next ChangeNumber in the database from this Cursor.
     *
     * @return The next ChangeNumber in the database from this cursor.
     * @throws DatabaseException In case of underlying database problem.
opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
@@ -81,6 +81,7 @@
  private boolean shutdown = false;
  private boolean done = false;
  private DirectoryThread thread = null;
  private Object flushLock = new Object();
  /**
   * Creates a New dbHandler associated to a given LDAP server.
@@ -204,6 +205,12 @@
  public ChangelogIterator generateIterator(ChangeNumber changeNumber)
                           throws DatabaseException, Exception
  {
    /*
     * make sure to flush some changes in the database so that
     * we don't create the iterator on an empty database when the
     * dbHandler has just been started.
     */
    flush();
    return new ChangelogIterator(serverId, db, changeNumber);
  }
@@ -320,17 +327,22 @@
      while ((size < 5000 ) &&  (!finished))
      {
        ChangeNumber changeNumber = cursor.nextChangeNumber();
        if ((changeNumber != null) && (!changeNumber.equals(lastChange))
            && (changeNumber.older(trimDate)))
        if (changeNumber != null)
        {
          size++;
          cursor.delete();
          if ((!changeNumber.equals(lastChange))
              && (changeNumber.older(trimDate)))
          {
            size++;
            cursor.delete();
          }
          else
          {
            firstChange = changeNumber;
            finished = true;
          }
        }
        else
        {
          firstChange = changeNumber;
          finished = true;
        }
      }
      cursor.close();
@@ -350,19 +362,21 @@
    do
    {
      // get N messages to save in the DB
      List<UpdateMessage> changes = getChanges(500);
      synchronized(flushLock)
      {
        // get N messages to save in the DB
        List<UpdateMessage> changes = getChanges(500);
      // if no more changes to save exit immediately.
      if ((changes == null) || ((size = changes.size()) == 0))
        return;
        // if no more changes to save exit immediately.
        if ((changes == null) || ((size = changes.size()) == 0))
          return;
      // save the change to the stable storage.
      db.addEntries(changes);
        // save the change to the stable storage.
        db.addEntries(changes);
      // remove the changes from the list of changes to be saved.
      clear(changes.size());
        // remove the changes from the list of changes to be saved.
        clear(changes.size());
      }
    } while (size >=500);
  }
@@ -387,19 +401,17 @@
      attributes.add(new Attribute("changelog-database",
                                   String.valueOf(serverId)));
      attributes.add(new Attribute("base-dn", baseDn.toString()));
      ChangeNumber first = getFirstChange();
      ChangeNumber last = getLastChange();
      if (first != null)
      if (firstChange != null)
      {
        Date firstTime = new Date(first.getTime());
        Date firstTime = new Date(firstChange.getTime());
        attributes.add(new Attribute("first-change",
            first.toString() + " " + firstTime.toString()));
            firstChange.toString() + " " + firstTime.toString()));
      }
      if (last != null)
      if (lastChange != null)
      {
        Date lastTime = new Date(last.getTime());
        Date lastTime = new Date(lastChange.getTime());
        attributes.add(new Attribute("last-change",
            last.toString() + " " + lastTime.toString()));
            lastChange.toString() + " " + lastTime.toString()));
      }
      return attributes;
opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -184,6 +184,7 @@
              InetAddress.getByName(hostname), Integer.parseInt(port));
          Socket socket = new Socket();
          socket.setReceiveBufferSize(1000000);
          socket.setTcpNoDelay(true);
          socket.connect(ServerAddr, 500);
          session = new SocketSession(socket);
opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -118,57 +118,18 @@
      return;
    savedStatus = true;
    ArrayList<ASN1OctetString> values = this.toASN1ArrayList();
    if (values.size() == 0)
      return;
    LDAPAttribute attr =
      new LDAPAttribute(SYNCHRONIZATION_STATE, values);
    LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
    ArrayList<LDAPModification> mods = new ArrayList<LDAPModification>(1);
    mods.add(mod);
    boolean done = false;
    while (!done)
    ResultCode resultCode = updateStateEntry();
    if (resultCode != ResultCode.SUCCESS)
    {
      /*
       * Generate a modify operation on the Server State Entry :
       * cn=ffffffff-ffffffff-ffffffff-ffffffff, baseDn
       */
      ModifyOperation op =
        new ModifyOperation(conn, InternalClientConnection.nextOperationID(),
            InternalClientConnection.nextMessageID(),
            new ArrayList<Control>(0), serverStateAsn1Dn,
            mods);
      op.setInternalOperation(true);
      op.setSynchronizationOperation(true);
      op.run();
      ResultCode resultCode = op.getResultCode();
      if (resultCode != ResultCode.SUCCESS)
      if (resultCode == ResultCode.NO_SUCH_OBJECT)
      {
        if (resultCode == ResultCode.NO_SUCH_OBJECT)
        {
          createStateEntry();
        }
        else
        {
          savedStatus = false;
          int msgID = MSGID_ERROR_UPDATING_RUV;
          String message = getMessage(msgID,
              op.getResultCode().getResultCodeName(),
              op.toString(), op.getErrorMessage(),
              baseDn.toString());
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.SEVERE_ERROR,
              message, msgID);
          break;
        }
        createStateEntry();
      }
      else
        done = true;
      {
        savedStatus = false;
      }
    }
  }
@@ -297,6 +258,51 @@
  }
  /**
   * Save the current values of this PersistentState object
   * in the appropiate entry of the database.
   *
   * @return a ResultCode indicating if the method was successfull.
   */
  private ResultCode updateStateEntry()
  {
    /*
     * Generate a modify operation on the Server State Entry :
     * cn=ffffffff-ffffffff-ffffffff-ffffffff, baseDn
     */
    ArrayList<ASN1OctetString> values = this.toASN1ArrayList();
    if (values.size() == 0)
      return ResultCode.SUCCESS;
    LDAPAttribute attr =
      new LDAPAttribute(SYNCHRONIZATION_STATE, values);
    LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
    ArrayList<LDAPModification> mods = new ArrayList<LDAPModification>(1);
    mods.add(mod);
    ModifyOperation op =
      new ModifyOperation(conn, InternalClientConnection.nextOperationID(),
          InternalClientConnection.nextMessageID(),
          new ArrayList<Control>(0), serverStateAsn1Dn,
          mods);
    op.setInternalOperation(true);
    op.setSynchronizationOperation(true);
    op.run();
    ResultCode result = op.getResultCode();
    if (result != ResultCode.SUCCESS)
    {
      int msgID = MSGID_ERROR_UPDATING_RUV;
      String message = getMessage(msgID, op.getResultCode().getResultCodeName(),
          op.toString(), op.getErrorMessage(), baseDn.toString());
      logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
    }
    return result;
  }
  /**
   * Get the Dn where the ServerState is stored.
   * @return Returns the serverStateDn.
   */
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java
@@ -95,14 +95,14 @@
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "Starting synchronization ProtocolWindowTest : saturateAndRestart" , 1);
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    ChangelogBroker broker = openChangelogSession(baseDn, (short) 13,
        WINDOW_SIZE, 8989, 1000);
        WINDOW_SIZE, 8989, 1000, true);
    try {
      /* Test that changelog monitor and synchro plugin monitor informations
       * publish the correct window size.
       * This allows both the check the monitoring code and to test that
@@ -111,7 +111,7 @@
      Thread.sleep(1500);
      assertTrue(checkWindows(WINDOW_SIZE));
      assertTrue(checkChangelogQueueSize(CHANGELOG_QUEUE_SIZE));
      // Create an Entry (add operation) that will be later used in the test.
      Entry tmp = personEntry.duplicate();
      AddOperation addOp = new AddOperation(connection,
@@ -138,7 +138,7 @@
        "The received ADD synchronization message is not for the excepted DN");
      // send (2 * window + changelog queue) modify operations
      // so that window + changelog queue get stuck in the changelog queue
      // so that window + changelog queue get stuck in the changelog queue
      int count = WINDOW_SIZE * 2 + CHANGELOG_QUEUE_SIZE;
      processModify(count);
@@ -173,7 +173,7 @@
  /**
   * Check that the Changelog queue size has correctly been configured
   * by reading the monitoring information.
   * @throws LDAPException
   * @throws LDAPException
   */
  private boolean checkChangelogQueueSize(int changelog_queue_size)
          throws LDAPException
@@ -188,7 +188,7 @@
  /**
   * Check that the window configuration has been successfull
   * by reading the monitoring information and checking
   * by reading the monitoring information and checking
   * that we do have 2 entries with the configured max-rcv-window.
   */
  private boolean checkWindows(int windowSize) throws LDAPException
@@ -200,7 +200,7 @@
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    return (op.getEntriesSent() == 3);
  }
  /**
   * Search that the changelog has stopped sending changes after
   * having reach the limit of the window size.
@@ -216,7 +216,7 @@
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    if (op.getEntriesSent() != 1)
      return false;
    op = connection.processSearch(
        new ASN1OctetString("cn=monitor"),
        SearchScope.WHOLE_SUBTREE,
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
@@ -89,7 +89,7 @@
  private String changeLogStringDN;
  private BrokerReader reader = null;
  /**
   * A "person" entry
   */
@@ -106,13 +106,13 @@
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "Starting Synchronization StressTest : fromServertoBroker" , 1);
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    final int TOTAL_MESSAGES = 1000;
    cleanEntries();
    ChangelogBroker broker =
      openChangelogSession(baseDn, (short) 18, 100, 8989, 5000);
      openChangelogSession(baseDn, (short) 18, 100, 8989, 5000, true);
    Monitor monitor = new Monitor("stress test monitor");
    DirectoryServer.registerMonitorProvider(monitor);
@@ -212,7 +212,7 @@
    // Create an internal connection
    connection = new InternalClientConnection();
    // Disable schema check
    schemaCheck = DirectoryServer.checkSchema();
    DirectoryServer.setCheckSchema(false);
@@ -411,7 +411,7 @@
      return count;
    }
  }
  private class Monitor extends MonitorProvider
  {
    protected Monitor(String threadName)
@@ -445,7 +445,7 @@
    public void updateMonitorData()
    {
      // nothing to do
    }
    @Override
@@ -453,7 +453,7 @@
    throws ConfigException, InitializationException
    {
      // nothing to do
    }
    @Override
@@ -463,7 +463,7 @@
      return 0;
    }
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -41,6 +41,7 @@
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.plugin.ChangelogBroker;
import org.opends.server.synchronization.plugin.MultimasterSynchronization;
import org.opends.server.synchronization.plugin.PersistentServerState;
@@ -52,7 +53,7 @@
import org.testng.annotations.BeforeClass;
/**
 * An abstract class that all synchronization unit test should extend.
 * An abstract class that all synchronization unit test should extend.
 */
@Test(groups = { "precommit", "synchronization" })
public abstract class SynchronizationTestCase extends DirectoryServerTestCase
@@ -91,7 +92,7 @@
  /**
   * Set up the environment for performing the tests in this suite.
   *
   *
   * @throws Exception
   *         If the environment could not be set up.
   */
@@ -100,21 +101,24 @@
  {
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    schemaCheck = DirectoryServer.checkSchema();
    // Create an internal connection
    connection = new InternalClientConnection();
  }
  /**
   * Open a changelog session to the local Changelog server.
   *
   */
  protected ChangelogBroker openChangelogSession(
      final DN baseDn, short serverId, int window_size, int port, int timeout)
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, boolean emptyOldChanges)
          throws Exception, SocketException
  {
    PersistentServerState state = new PersistentServerState(baseDn);
    state.loadState();
    if (emptyOldChanges)
      state.loadState();
    ChangelogBroker broker = new ChangelogBroker(
        state, baseDn, serverId, 0, 0, 0, 0, window_size);
    ArrayList<String> servers = new ArrayList<String>(1);
@@ -122,22 +126,45 @@
    broker.start(servers);
    if (timeout != 0)
      broker.setSoTimeout(timeout);
    /*
     * loop receiving update until there is nothing left
     * to make sure that message from previous tests have been consumed.
     */
    try
    if (emptyOldChanges)
    {
      while (true)
      /*
       * loop receiving update until there is nothing left
       * to make sure that message from previous tests have been consumed.
       */
      try
      {
        broker.receive();
        while (true)
        {
          broker.receive();
        }
      }
      catch (Exception e)
      { }
    }
    catch (Exception e)
    { }
    return broker;
  }
  /**
   * Open a new session to the Changelog Server
   * starting with a given ServerState.
   */
  protected ChangelogBroker openChangelogSession(
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, ServerState state)
          throws Exception, SocketException
  {
    ChangelogBroker broker = new ChangelogBroker(
        state, baseDn, serverId, 0, 0, 0, 0, window_size);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
    broker.start(servers);
    if (timeout != 0)
      broker.setSoTimeout(timeout);
    return broker;
  }
  /**
   * suppress all the entries created by the tests in this class
   */
@@ -149,11 +176,11 @@
    {
      while (true)
      {
        DN dn = entryList.removeLast();
        DN dn = entryList.removeLast();
        op = new DeleteOperation(connection, InternalClientConnection
            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
            dn);
        op.run();;
      }
    }
@@ -172,7 +199,7 @@
  public void classCleanUp() throws Exception
  {
    DirectoryServer.setCheckSchema(schemaCheck);
    // WORKAROUND FOR BUG #639 - BEGIN -
    if (mms != null)
    {
@@ -180,7 +207,7 @@
      mms.finalizeSynchronizationProvider();
    }
    // WORKAROUND FOR BUG #639 - END -
    cleanEntries();
  }
@@ -196,7 +223,7 @@
    assertNotNull(DirectoryServer.getConfigEntry(DN
        .decode(synchroPluginStringDN)),
        "Unable to add the Multimaster synchronization plugin");
    // WORKAROUND FOR BUG #639 - BEGIN -
    DN dn = DN.decode(synchroPluginStringDN);
    ConfigEntry mmsConfigEntry = DirectoryServer.getConfigEntry(dn);
@@ -212,14 +239,14 @@
    }
    DirectoryServer.registerSynchronizationProvider(mms);
    // WORKAROUND FOR BUG #639 - END -
    //
    // Add the changelog server
    DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
        "Unable to add the changeLog server");
    entryList.add(changeLogEntry.getDN());
    //
    // We also have a replicated suffix (synchronization domain)
    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -84,7 +84,7 @@
  private String user1entrysecondUUID;
  private String user1entryUUID;
  /**
   * A "person" entry
   */
@@ -254,7 +254,7 @@
     * This must use a serverId different from the LDAP server ID
     */
    ChangelogBroker broker =
      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000);
      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true);
    /*
     * Create a Change number generator to generate new changenumbers
@@ -660,7 +660,7 @@
    cleanEntries();
    ChangelogBroker broker =
      openChangelogSession(baseDn, (short) 27, 100, 8989, 1000);
      openChangelogSession(baseDn, (short) 27, 100, 8989, 1000, true);
    try {
      ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 27, 0);
@@ -883,7 +883,7 @@
          break;
        }
      }
      if (lock == null)
      {
        throw new Exception("could not lock entry " + dn);
@@ -892,8 +892,8 @@
      try
      {
        newEntry = DirectoryServer.getEntry(personWithUUIDEntry.getDN());
        if (newEntry == null)
          fail("The entry " + personWithUUIDEntry.getDN() +
          " has incorrectly been deleted from the database.");
@@ -903,13 +903,13 @@
        AttributeType attrType =
          DirectoryServer.getAttributeType(attrTypeStr, true);
        found = tmpAttr.hasValue(new AttributeValue(attrType, valueString));
      }
      finally
      {
        LockManager.unlock(dn, lock);
      }
      if (found != hasAttribute)
        Thread.sleep(100);
    } while ((--count > 0) && (found != hasAttribute));
@@ -918,7 +918,7 @@
  /**
   *  Get the entryUUID for a given DN.
   *
   *
   * @throws Exception if the entry does not exist or does not have
   *                   an entryUUID.
   */
@@ -932,7 +932,7 @@
    while ((count> 0) && (found == null))
    {
      Thread.sleep(100);
      Lock lock = null;
      for (int i=0; i < 3; i++)
      {
@@ -942,7 +942,7 @@
          break;
        }
      }
      if (lock == null)
      {
        throw new Exception("could not lock entry " + dn);
@@ -951,7 +951,7 @@
      try
      {
        newEntry = DirectoryServer.getEntry(dn);
        if (newEntry != null)
        {
          List<Attribute> tmpAttrList = newEntry.getAttribute("entryuuid");
@@ -991,11 +991,11 @@
    while ((count> 0) && (found != exist))
    {
      Thread.sleep(200);
      found = DirectoryServer.entryExists(dn);
      count--;
    }
    Lock lock = null;
    for (int i=0; i < 3; i++)
    {
@@ -1005,7 +1005,7 @@
        break;
      }
    }
    if (lock == null)
    {
      throw new Exception("could not lock entry " + dn);
@@ -1062,7 +1062,7 @@
    Thread.sleep(2000);
    ChangelogBroker broker =
      openChangelogSession(baseDn, (short) 11, 100, 8989, 1000);
      openChangelogSession(baseDn, (short) 11, 100, 8989, 1000, true);
    try
    {
      ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 11, 0);
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -30,33 +30,56 @@
import org.opends.server.TestCaseUtils;
import org.opends.server.config.ConfigEntry;
import org.opends.server.core.DirectoryServer;
import org.opends.server.synchronization.SynchronizationTestCase;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ChangeNumberGenerator;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.plugin.ChangelogBroker;
import org.opends.server.synchronization.protocol.DeleteMsg;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.util.TimeThread;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
/**
 * Tests for the chngelog service code.
 * Tests for the changelog service code.
 */
public class ChangelogTest extends SynchronizationTestCase
{
  /**
   * Basic test of the changelog code.
   * Create a changelog server, connect 2 clients and exchange
   * messages between the clients.
   * The changelog server that will be used in this test.
   */
  @Test()
  public void changelogBasic() throws Exception
  private Changelog changelog = null;
  /**
   * The port of the changelog server.
   */
  private int changelogPort;
  private ChangeNumber firstChangeNumberServer1 = null;
  private ChangeNumber secondChangeNumberServer1 = null;
  private ChangeNumber firstChangeNumberServer2 = null;
  private ChangeNumber secondChangeNumberServer2 = null;
  /**
   * Before starting the tests, start the server and configure a
   * changelog server.
   */
  @BeforeClass()
  public void configure() throws Exception
  {
    // find  a free port
    TestCaseUtils.startServer();
    //  find  a free port for the changelog server
    ServerSocket socket = TestCaseUtils.bindFreePort();
    int changelogPort = socket.getLocalPort();
    changelogPort = socket.getLocalPort();
    socket.close();
    String changelogLdif =
@@ -65,39 +88,363 @@
        + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
        + "cn: Changelog Server\n"
        + "ds-cfg-changelog-port: "+ changelogPort + "\n"
        + "ds-cfg-changelog-server-id: 1\n";
        + "ds-cfg-changelog-server-id: 1\n"
        + "ds-cfg-window-size: 100";
    Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
    ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
    Changelog changelog = new Changelog(changelogConfig);
    changelog = new Changelog(changelogConfig);
  }
    ChangelogBroker broker1 = null;
    ChangelogBroker broker2 = null;
  /**
   * Basic test of the changelog code :
   *  Connect 2 clients to the changelog server and exchange messages
   *  between the clients.
   *
   * Note : Other tests in this file depends on this test and may need to
   *        change if this test is modified.
   */
  @Test()
  public void changelogBasic() throws Exception
  {
    ChangelogBroker server1 = null;
    ChangelogBroker server2 = null;
    try {
      broker1 = openChangelogSession(
          DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort, 1000);
      broker2 = openChangelogSession(
          DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort, 1000);
      /*
       * Open a sender session and a receiver session to the changelog
       */
      server1 = openChangelogSession(
          DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort,
          1000, true);
      server2 = openChangelogSession(
          DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort,
          1000, true);
      ChangeNumber cn = new ChangeNumber((long) 1, 1, (short)1);
      DeleteMsg msg = new DeleteMsg("o=test,dc=example,dc=com", cn, "uid");
      broker1.publish(msg);
      SynchronizationMessage msg2 = broker2.receive();
      /*
       * Create change numbers for the messages sent from server 1
       * with current time  sequence 1 and with current time + 2 sequence 2
       */
      long time = TimeThread.getTime();
      firstChangeNumberServer1 = new ChangeNumber(time, 1, (short) 1);
      secondChangeNumberServer1 = new ChangeNumber(time + 2, 2, (short) 1);
      /*
       * Create change numbers for the messages sent from server 2
       * with current time  sequence 1 and with current time + 3 sequence 2
       */
      firstChangeNumberServer2 = new ChangeNumber(time+ 1, 1, (short) 2);
      secondChangeNumberServer2 = new ChangeNumber(time + 3, 2, (short) 2);
      /*
       * Send and receive a Delete Msg from server 1 to server 2
       */
      DeleteMsg msg =
        new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer1,
                      "uid");
      server1.publish(msg);
      SynchronizationMessage msg2 = server2.receive();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.toString().equals(msg2.toString()));
        assertTrue(del.toString().equals(msg.toString()),
            "Changelog basic : incorrect message body received.");
      }
      else
        fail("Changelog transmission failed");
        fail("Changelog basic : incorrect message type received.");
      /*
       * Send and receive a second Delete Msg
       */
      msg = new DeleteMsg("o=test", secondChangeNumberServer1, "uid");
      server1.publish(msg);
      msg2 = server2.receive();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.toString().equals(msg.toString()),
            "Changelog basic : incorrect message body received.");
      }
      else
        fail("Changelog basic : incorrect message type received.");
      /*
       * Send and receive a Delete Msg from server 1 to server 2
       */
      msg =
        new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer2,
                      "other-uid");
      server2.publish(msg);
      msg2 = server1.receive();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.toString().equals(msg.toString()),
            "Changelog basic : incorrect message body received.");
      }
      else
        fail("Changelog basic : incorrect message type received.");
      /*
       * Send and receive a second Delete Msg
       */
      msg = new DeleteMsg("o=test", secondChangeNumberServer2, "uid");
      server2.publish(msg);
      msg2 = server1.receive();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.toString().equals(msg.toString()),
            "Changelog basic : incorrect message body received.");
      }
      else
        fail("Changelog basic : incorrect message type received.");
    }
    finally
    {
      if (server1 != null)
        server1.stop();
      if (server2 != null)
        server2.stop();
    }
  }
  /**
   * Test that a new client see the change that was sent in the
   * previous test.
   */
  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
  public void newClient() throws Exception
  {
    ChangelogBroker broker = null;
    try {
      broker =
        openChangelogSession(DN.decode("dc=example,dc=com"), (short) 3,
                             100, changelogPort, 1000, false);
      SynchronizationMessage msg2 = broker.receive();
      if (!(msg2 instanceof DeleteMsg))
        fail("Changelog basic transmission failed");
      else
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.getChangeNumber().equals(firstChangeNumberServer1),
            "The first message received by a new client was the wrong one."
            + del.getChangeNumber() + " " + firstChangeNumberServer1);
      }
    }
    finally
    {
      if (broker != null)
        broker.stop();
    }
  }
  /**
   * Test that a client that has already seen some changes now receive
   * the correct next change.
   */
  private void newClientWithChanges(
      ServerState state, ChangeNumber nextChangeNumber) throws Exception
  {
    ChangelogBroker broker = null;
    /*
     * Connect to the changelog server using the state created above.
     */
    try {
      broker =
        openChangelogSession(DN.decode("dc=example,dc=com"), (short) 3,
                             100, changelogPort, 1000, state);
      SynchronizationMessage msg2 = broker.receive();
      if (!(msg2 instanceof DeleteMsg))
        fail("Changelog basic transmission failed");
      else
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.getChangeNumber().equals(nextChangeNumber),
            "The second message received by a new client was the wrong one."
            + del.getChangeNumber() + " " + nextChangeNumber);
      }
    }
    finally
    {
      if (broker != null)
        broker.stop();
    }
  }
  /**
   * Test that a client that has already seen the first change now see the
   * second change
   */
  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
  public void newClientWithFirstChanges() throws Exception
  {
    /*
     * Create a ServerState updated with the first changes from both servers
     * done in test changelogBasic.
     */
    ServerState state = new ServerState();
    state.update(firstChangeNumberServer1);
    state.update(firstChangeNumberServer2);
    newClientWithChanges(state, secondChangeNumberServer1);
  }
  /**
   * Test that a client that has already seen the first change from server 1
   * now see the first change from server 2
   */
  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
  public void newClientWithChangefromServer1() throws Exception
  {
    /*
     * Create a ServerState updated with the first change from server 1
     */
    ServerState state = new ServerState();
    state.update(firstChangeNumberServer1);
    newClientWithChanges(state, firstChangeNumberServer2);
  }
  /**
   * Test that a client that has already seen the first chaneg from server 2
   * now see the first change from server 1
   */
  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
  public void newClientWithChangefromServer2() throws Exception
  {
    /*
     * Create a ServerState updated with the first change from server 1
     */
    ServerState state = new ServerState();
    state.update(firstChangeNumberServer2);
    newClientWithChanges(state, firstChangeNumberServer1);
  }
  /**
   * Test that a client that has not seen the second change from server 1
   * now receive it.
   */
  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
  public void newClientLateServer1() throws Exception
  {
    /*
     * Create a ServerState updated with the first change from server 1
     */
    ServerState state = new ServerState();
    state.update(secondChangeNumberServer2);
    state.update(firstChangeNumberServer1);
    newClientWithChanges(state, secondChangeNumberServer1);
  }
  /**
   * Test that newClient() and newClientWithFirstChange() still works
   * after stopping and restarting the changelog server.
   */
  @Test(enabled=false, dependsOnMethods = { "changelogBasic" })
  public void stopChangelog() throws Exception
  {
    changelog.shutdown();
    configure();
    newClient();
    newClientWithFirstChanges();
    newClientWithChangefromServer1();
    newClientWithChangefromServer2();
  }
  /**
   * Stress test from client using the ChangelogBroker API
   * to the changelog server.
   */
  @Test(enabled=false, groups="slow")
  public void stressFromBrokertoChangelog() throws Exception
  {
    ChangelogBroker server = null;
    try
    {
      /*
       * Open a sender session
       */
      server = openChangelogSession(
          DN.decode("dc=example,dc=com"), (short) 5, 100, changelogPort,
          1000, true);
      BrokerReader reader = new BrokerReader(server);
      reader.start();
      ChangeNumberGenerator gen =
        new ChangeNumberGenerator((short)5 , (long) 0);
      /*
       * Simple loop creating changes and sending them
       * to the changelog server.
       */
      for (int i = 0; i< 100000; i++)
      {
        DeleteMsg msg =
          new DeleteMsg("o=test,dc=example,dc=com", gen.NewChangeNumber(),
          "uid");
        server.publish(msg);
      }
    }
    finally
    {
      if (server != null)
        server.stop();
    }
  }
  /**
   * After the tests stop the changelog server.
   */
  @AfterClass()
  public void shutdown() throws Exception
  {
    if (changelog != null)
      changelog.shutdown();
      if (broker1 != null)
        broker1.stop();
      if (broker2 != null)
        broker2.stop();
  }
  /**
   * Continuously reads messages from a changelog broker until there is nothing
   * left. Count the number of received messages.
   */
  private class BrokerReader extends Thread
  {
    private ChangelogBroker broker;
    /**
     * Creates a new Stress Test Reader
     * @param broker
     */
    public BrokerReader(ChangelogBroker broker)
    {
      this.broker = broker;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void run()
    {
      // loop receiving messages until either we get a timeout
      // because there is nothing left or an error condition happens.
      try
      {
        while (true)
        {
          SynchronizationMessage msg = broker.receive();
          if (msg == null)
            break;
        }
      } catch (Exception e) {
      }
    }
  }
}