mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
29.21.2007 97c337fae8aa9247da4fd9ea3d7b9974887eb124
- Fix issue 1160 : synchronization is flushing the msgQueue to the database too often
The synchronization server is flushing all the queues of the messages
received from a LDAP server each time a server needs to retrieve some
changes that are not in memory anymore.

This cause the reading process of old changes very slow when there is also new changes coming in.

The servers that are already late can therefore become more and more late.

The solution is to detect the conditions when a flush is necessary by comparing the date of the changeNumber where
we want to start with the date of the first ChangeNumber
and only flush in those cases.

- Also fix some warning at compilation time and when using java 6

- Also fix an problem between test ConfigurableComponentTestCase and the ChangelogTest
because ConfigurableComponentTestCase expect all component to be configurable while
the dynamic configuration code for the Changelog is not yet implemented.

- 2 new unit tests are also included :

. scalability test of a synchronization server when one LDAP server
is used as a master and several other LDAP servers are used as
read only servers :
org.opends.server.synchronization.changelog.ChangelogTest.OneWriterMultipleReader()

. scalability test of a sycnhronization server when several LDAP Servers are used as master simultaneously : org.opends.server.synchronization.changelog.ChangelogTest.MultipleWriterMultipleReader()

These tests do not instantiate the LDAP servers but simulate them
by using directly the ChangelogBroker API.

The second is failing (issue 1162) and is therefore disabled.
4 files modified
363 ■■■■ changed files
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java 2 ●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java 35 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java 39 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java 287 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -328,7 +328,7 @@
      List<String> unacceptableReasons)
  {
    // TODO NYI
    return false;
    return true;
  }
  /**
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
@@ -35,6 +35,7 @@
import java.util.Date;
import java.util.List;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
@@ -206,11 +207,37 @@
                           throws DatabaseException, Exception
  {
    /*
     * make sure to flush some changes in the database so that
     * we don't create the iterator on an empty database when the
     * dbHandler has just been started.
     * When we create an iterator we need to make sure that we
     * don't miss some changes because the iterator is created
     * close to the limit of the changed that have not yet been
     * flushed to the database.
     * We detect this by comparing the date of the changeNumber where
     * we want to start with the date of the first ChangeNumber
     * of the msgQueue.
     * If this is the case we flush the queue to the database.
     */
    flush();
    ChangeNumber recentChangeNumber = null;
    if (changeNumber == null)
      flush();
    synchronized (msgQueue)
    {
      try
      {
        UpdateMessage msg = msgQueue.getFirst();
        recentChangeNumber = msg.getChangeNumber();
      }
      catch (NoSuchElementException e)
      {}
    }
    if ( (recentChangeNumber != null) &&
         (recentChangeNumber.getTimeSec() - changeNumber.getTimeSec() < 2))
    {
      flush();
    }
    return new ChangelogIterator(serverId, db, changeNumber);
  }
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -164,6 +164,45 @@
    return broker;
  }
  /**
   * Open a changelog session with flow control to the local Changelog server.
   *
   */
  protected ChangelogBroker openChangelogSession(
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, int maxSendQueue, int maxRcvQueue,
      boolean emptyOldChanges)
          throws Exception, SocketException
  {
    PersistentServerState state = new PersistentServerState(baseDn);
    if (emptyOldChanges)
      state.loadState();
    ChangelogBroker broker = new ChangelogBroker(
        state, baseDn, serverId, maxRcvQueue, 0, maxSendQueue, 0, window_size);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
    broker.start(servers);
    if (timeout != 0)
      broker.setSoTimeout(timeout);
    if (emptyOldChanges)
    {
      /*
       * loop receiving update until there is nothing left
       * to make sure that message from previous tests have been consumed.
       */
      try
      {
        while (true)
        {
          broker.receive();
        }
      }
      catch (Exception e)
      { }
    }
    return broker;
  }
  /**
   * suppress all the entries created by the tests in this class
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -26,10 +26,11 @@
 */
package org.opends.server.synchronization.changelog;
import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import static org.opends.server.synchronization.protocol.OperationContext.*;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
@@ -375,11 +376,25 @@
  /**
   * Stress test from client using the ChangelogBroker API
   * to the changelog server.
   * This test allow to investigate the behaviour of the
   * Changelog server when it needs to distribute the load of
   * updates from a single LDAP server to a number of LDAP servers.
   *
   * This test i sconfigured by a relatively low stress
   * but can be changed using TOTAL_MSG and CLIENT_THREADS consts.
   */
  @Test(enabled=false, groups="slow")
  public void stressFromBrokertoChangelog() throws Exception
  @Test(enabled=true, groups="slow")
  public void oneWriterMultipleReader() throws Exception
  {
    ChangelogBroker server = null;
    int TOTAL_MSG = 1000;     // number of messages to send during the test
    int CLIENT_THREADS = 2;   // number of threads that will try to read
                              // the messages
    ChangeNumberGenerator gen =
      new ChangeNumberGenerator((short)5 , (long) 0);
    BrokerReader client[] = new BrokerReader[CLIENT_THREADS];
    ChangelogBroker clientBroker[] = new ChangelogBroker[CLIENT_THREADS];
    try
    {
@@ -388,24 +403,112 @@
       */
      server = openChangelogSession(
          DN.decode("dc=example,dc=com"), (short) 5, 100, changelogPort,
          1000, true);
          1000, 1000, 0, true);
      BrokerReader reader = new BrokerReader(server);
      /*
       * Start the client threads.
       */
      for (int i =0; i< CLIENT_THREADS; i++)
      {
        clientBroker[i] = openChangelogSession(
            DN.decode("dc=example,dc=com"), (short) (100+i), 100, changelogPort,
            1000, true);
        client[i] = new BrokerReader(clientBroker[i]);
      }
      for (int i =0; i< CLIENT_THREADS; i++)
      {
        client[i].start();
      }
      reader.start();
      ChangeNumberGenerator gen =
        new ChangeNumberGenerator((short)5 , (long) 0);
      /*
       * Simple loop creating changes and sending them
       * to the changelog server.
       */
      for (int i = 0; i< 100000; i++)
      for (int i = 0; i< TOTAL_MSG; i++)
      {
        DeleteMsg msg =
          new DeleteMsg("o=test,dc=example,dc=com", gen.NewChangeNumber(),
          "uid");
        server.publish(msg);
      }
      for (int i =0; i< CLIENT_THREADS; i++)
      {
        client[i].join();
        reader.join();
      }
    }
    finally
    {
      if (server != null)
        server.stop();
      for (int i =0; i< CLIENT_THREADS; i++)
      {
        clientBroker[i].stop();
      }
    }
  }
  /**
   * Stress test from client using the ChangelogBroker API
   * to the changelog server.
   *
   * This test allow to investigate the behaviour of the
   * Changelog server when it needs to distribute the load of
   * updates from multiple LDAP server to a number of LDAP servers.
   *
   * This test is sconfigured for a relatively low stress
   * but can be changed using TOTAL_MSG and THREADS consts.
   */
  @Test(enabled=false, groups="slow")
  public void multipleWriterMultipleReader() throws Exception
  {
    ChangelogBroker server = null;
    final int TOTAL_MSG = 1000;   // number of messages to send during the test
    final int THREADS = 2;       // number of threads that will produce
                               // and read the messages.
    BrokerWriter producer[] = new BrokerWriter[THREADS];
    BrokerReader reader[] = new BrokerReader[THREADS];
    try
    {
      /*
       * Start the producer threads.
       */
      for (int i =0; i< THREADS; i++)
      {
        short serverId = (short) (10+i);
        ChangeNumberGenerator gen =
          new ChangeNumberGenerator(serverId , (long) 0);
        ChangelogBroker broker =
          openChangelogSession( DN.decode("dc=example,dc=com"), serverId,
            100, changelogPort, 1000, 1000, 0, true);
        producer[i] = new BrokerWriter(broker, gen, TOTAL_MSG/THREADS);
        reader[i] = new BrokerReader(broker);
      }
      for (int i =0; i< THREADS; i++)
      {
        producer[i].start();
      }
      for (int i =0; i< THREADS; i++)
      {
        reader[i].start();
      }
      for (int i =0; i< THREADS; i++)
      {
        producer[i].join();
        reader[i].join();
      }
    }
    finally
    {
@@ -414,57 +517,11 @@
    }
  }
  /**
   * After the tests stop the changelog server.
   */
  @AfterClass()
  public void shutdown() throws Exception
  {
    if (changelog != null)
      changelog.shutdown();
  }
  /**
   * Continuously reads messages from a changelog broker until there is nothing
   * left. Count the number of received messages.
   */
  private class BrokerReader extends Thread
  {
    private ChangelogBroker broker;
    /**
     * Creates a new Stress Test Reader
     * @param broker
     */
    public BrokerReader(ChangelogBroker broker)
    {
      this.broker = broker;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void run()
    {
      // loop receiving messages until either we get a timeout
      // because there is nothing left or an error condition happens.
      try
      {
        while (true)
        {
          SynchronizationMessage msg = broker.receive();
          if (msg == null)
            break;
        }
      } catch (Exception e) {
      }
    }
  }
  /**
   * Chaining tests of the changelog code with 2 changelog servers involved
   * 2 tests are done here (itest=0 or itest=1)
   *
   *
   * Test 1
   * - Create changelog server 1
   * - Create changelog server 2 connected with changelog server 1
@@ -472,7 +529,7 @@
   * - Create and connect client 2 to changelog server 2
   * - Make client1 publish changes
   * - Check that client 2 receives the changes published by client 1
   *
   *
   * Test 2
   * - Create changelog server 1
   * - Create and connect client1 to changelog server 1
@@ -480,7 +537,7 @@
   * - Create changelog server 2 connected with changelog server 1
   * - Create and connect client 2 to changelog server 2
   * - Check that client 2 receives the changes published by client 1
   *
   *
   */
  @Test(enabled=true)
  public void changelogChaining() throws Exception
@@ -520,8 +577,8 @@
        String changelogLdif = "dn: cn=Changelog Server\n"
          + "objectClass: top\n"
          + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
          + "cn: Changelog Server\n"
          + "ds-cfg-changelog-port: " + changelogPorts[i] + "\n"
          + "cn: Changelog Server\n"
          + "ds-cfg-changelog-port: " + changelogPorts[i] + "\n"
          + "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n"
          + "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n"
          + "ds-cfg-window-size: 100" + "\n"
@@ -535,17 +592,17 @@
      try
      {
        // For itest=0, create and connect client1 to changelog1
        // For itest=0, create and connect client1 to changelog1
        //              and client2 to changelog2
        // For itest=1, only create and connect client1 to changelog1
        // For itest=1, only create and connect client1 to changelog1
        //              client2 will be created later
        broker1 = openChangelogSession(DN.decode("dc=example,dc=com"),
            (short) brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
             brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
        if (itest == 0)
        {
          broker2 = openChangelogSession(DN.decode("dc=example,dc=com"),
              (short) brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges);
             brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges);
        }
        // - Test messages between clients by publishing now
@@ -553,7 +610,7 @@
        // - Delete
        long time = TimeThread.getTime();
        int ts = 1;
        ChangeNumber cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
        ChangeNumber cn = new ChangeNumber(time, ts++, brokerIds[0]);
        DeleteMsg delMsg = new DeleteMsg("o=test"+itest+",dc=example,dc=com", cn, "uid");
        broker1.publish(delMsg);
@@ -566,7 +623,7 @@
            + "objectClass: top\n" + "objectClass: domain\n"
            + "entryUUID: 11111111-1111-1111-1111-111111111111\n");
        Entry entry = TestCaseUtils.entryFromLdifString(lentry);
        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
        cn = new ChangeNumber(time, ts++, brokerIds[0]);
        AddMsg addMsg = new AddMsg(cn, "o=test,dc=example,dc=com",
            user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry
            .getAttributes(), new ArrayList<Attribute>());
@@ -577,13 +634,13 @@
        Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
        List<Modification> mods = new ArrayList<Modification>();
        mods.add(mod1);
        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
        cn = new ChangeNumber(time, ts++, brokerIds[0]);
        ModifyMsg modMsg = new ModifyMsg(cn, DN
            .decode("o=test,dc=example,dc=com"), mods, "fakeuniqueid");
        broker1.publish(modMsg);
        // - ModifyDN
        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
        cn = new ChangeNumber(time, ts++, brokerIds[0]);
        ModifyDNOperation op = new ModifyDNOperation(connection, 1, 1, null, DN
            .decode("o=test,dc=example,dc=com"), RDN.decode("o=test2"), true,
            null);
@@ -598,9 +655,9 @@
          String changelogLdif = "dn: cn=Changelog Server\n"
            + "objectClass: top\n"
            + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
            + "cn: Changelog Server\n"
            + "cn: Changelog Server\n"
            + "ds-cfg-changelog-port: " + changelogPorts[1] + "\n"
            + "ds-cfg-changelog-server: localhost:" + changelogPorts[0] + "\n"
            + "ds-cfg-changelog-server: localhost:" + changelogPorts[0] + "\n"
            + "ds-cfg-changelog-server-id: " + changelogIds[1] + "\n";
          Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
          ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
@@ -608,7 +665,7 @@
          // Connect broker 2 to changelog2
          broker2 = openChangelogSession(DN.decode("dc=example,dc=com"),
              (short) brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
              brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
        }
        // - Check msg receives by broker, through changeLog2
@@ -658,7 +715,7 @@
          }
        }
        // Check that everything expected has been received
        assertTrue(ts == 1, "Broker2 did not receive the complete set of"
        assertTrue(ts == 1, "Broker2 did not receive the complete set of"
            + " expected messages: #msg received " + ts);
      }
      finally
@@ -674,4 +731,94 @@
      }
    }
  }
  /**
   * After the tests stop the changelog server.
   */
  @AfterClass()
  public void shutdown() throws Exception
  {
    if (changelog != null)
      changelog.shutdown();
  }
  /**
   * This class allows to creater reader thread.
   * They continuously reads messages from a changelog broker until
   * there is nothing left.
   * They Count the number of received messages.
   */
  private class BrokerReader extends Thread
  {
    private ChangelogBroker broker;
    /**
     * Creates a new Stress Test Reader
     * @param broker
     */
    public BrokerReader(ChangelogBroker broker)
    {
      this.broker = broker;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void run()
    {
      // loop receiving messages until either we get a timeout
      // because there is nothing left or an error condition happens.
      try
      {
        while (true)
        {
          SynchronizationMessage msg = broker.receive();
          if (msg == null)
            break;
          }
      } catch (Exception e) {
      }
    }
  }
  /**
   * This class allows to create writer thread that can
   * be used as producers for the Changelog stress tests.
   */
  private class BrokerWriter extends Thread
  {
    int count;
    private ChangelogBroker broker;
    ChangeNumberGenerator gen;
    public BrokerWriter(ChangelogBroker broker, ChangeNumberGenerator gen,
        int count)
    {
      this.broker = broker;
      this.count = count;
      this.gen = gen;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void run()
    {
      /*
       * Simple loop creating changes and sending them
       * to the changelog server.
       */
      while (count>0)
      {
        count--;
        DeleteMsg msg =
          new DeleteMsg("o=test,dc=example,dc=com", gen.NewChangeNumber(),
              "uid");
        broker.publish(msg);
      }
    }
  }
}