mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
29.21.2007 b3112742902d0145779f9ac094857c0a3e9b1d79
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -328,7 +328,7 @@
      List<String> unacceptableReasons)
  {
    // TODO NYI
    return false;
    return true;
  }
  /**
opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
@@ -35,6 +35,7 @@
import java.util.Date;
import java.util.List;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
@@ -206,11 +207,37 @@
                           throws DatabaseException, Exception
  {
    /*
     * make sure to flush some changes in the database so that
     * we don't create the iterator on an empty database when the
     * dbHandler has just been started.
     * When we create an iterator we need to make sure that we
     * don't miss some changes because the iterator is created
     * close to the limit of the changed that have not yet been
     * flushed to the database.
     * We detect this by comparing the date of the changeNumber where
     * we want to start with the date of the first ChangeNumber
     * of the msgQueue.
     * If this is the case we flush the queue to the database.
     */
    flush();
    ChangeNumber recentChangeNumber = null;
    if (changeNumber == null)
      flush();
    synchronized (msgQueue)
    {
      try
      {
        UpdateMessage msg = msgQueue.getFirst();
        recentChangeNumber = msg.getChangeNumber();
      }
      catch (NoSuchElementException e)
      {}
    }
    if ( (recentChangeNumber != null) &&
         (recentChangeNumber.getTimeSec() - changeNumber.getTimeSec() < 2))
    {
      flush();
    }
    return new ChangelogIterator(serverId, db, changeNumber);
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -164,6 +164,45 @@
    return broker;
  }
  /**
   * Open a changelog session with flow control to the local Changelog server.
   *
   */
  protected ChangelogBroker openChangelogSession(
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, int maxSendQueue, int maxRcvQueue,
      boolean emptyOldChanges)
          throws Exception, SocketException
  {
    PersistentServerState state = new PersistentServerState(baseDn);
    if (emptyOldChanges)
      state.loadState();
    ChangelogBroker broker = new ChangelogBroker(
        state, baseDn, serverId, maxRcvQueue, 0, maxSendQueue, 0, window_size);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
    broker.start(servers);
    if (timeout != 0)
      broker.setSoTimeout(timeout);
    if (emptyOldChanges)
    {
      /*
       * loop receiving update until there is nothing left
       * to make sure that message from previous tests have been consumed.
       */
      try
      {
        while (true)
        {
          broker.receive();
        }
      }
      catch (Exception e)
      { }
    }
    return broker;
  }
  /**
   * suppress all the entries created by the tests in this class
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -26,10 +26,11 @@
 */
package org.opends.server.synchronization.changelog;
import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import static org.opends.server.synchronization.protocol.OperationContext.*;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
@@ -375,11 +376,25 @@
  /**
   * Stress test from client using the ChangelogBroker API
   * to the changelog server.
   * This test allow to investigate the behaviour of the
   * Changelog server when it needs to distribute the load of
   * updates from a single LDAP server to a number of LDAP servers.
   *
   * This test i sconfigured by a relatively low stress
   * but can be changed using TOTAL_MSG and CLIENT_THREADS consts.
   */
  @Test(enabled=false, groups="slow")
  public void stressFromBrokertoChangelog() throws Exception
  @Test(enabled=true, groups="slow")
  public void oneWriterMultipleReader() throws Exception
  {
    ChangelogBroker server = null;
    int TOTAL_MSG = 1000;     // number of messages to send during the test
    int CLIENT_THREADS = 2;   // number of threads that will try to read
                              // the messages
    ChangeNumberGenerator gen =
      new ChangeNumberGenerator((short)5 , (long) 0);
    BrokerReader client[] = new BrokerReader[CLIENT_THREADS];
    ChangelogBroker clientBroker[] = new ChangelogBroker[CLIENT_THREADS];
    try
    {
@@ -388,24 +403,112 @@
       */
      server = openChangelogSession(
          DN.decode("dc=example,dc=com"), (short) 5, 100, changelogPort,
          1000, true);
          1000, 1000, 0, true);
      BrokerReader reader = new BrokerReader(server);
      /*
       * Start the client threads.
       */
      for (int i =0; i< CLIENT_THREADS; i++)
      {
        clientBroker[i] = openChangelogSession(
            DN.decode("dc=example,dc=com"), (short) (100+i), 100, changelogPort,
            1000, true);
        client[i] = new BrokerReader(clientBroker[i]);
      }
      for (int i =0; i< CLIENT_THREADS; i++)
      {
        client[i].start();
      }
      reader.start();
      ChangeNumberGenerator gen =
        new ChangeNumberGenerator((short)5 , (long) 0);
      /*
       * Simple loop creating changes and sending them
       * to the changelog server.
       */
      for (int i = 0; i< 100000; i++)
      for (int i = 0; i< TOTAL_MSG; i++)
      {
        DeleteMsg msg =
          new DeleteMsg("o=test,dc=example,dc=com", gen.NewChangeNumber(),
          "uid");
        server.publish(msg);
      }
      for (int i =0; i< CLIENT_THREADS; i++)
      {
        client[i].join();
        reader.join();
      }
    }
    finally
    {
      if (server != null)
        server.stop();
      for (int i =0; i< CLIENT_THREADS; i++)
      {
        clientBroker[i].stop();
      }
    }
  }
  /**
   * Stress test from client using the ChangelogBroker API
   * to the changelog server.
   *
   * This test allow to investigate the behaviour of the
   * Changelog server when it needs to distribute the load of
   * updates from multiple LDAP server to a number of LDAP servers.
   *
   * This test is sconfigured for a relatively low stress
   * but can be changed using TOTAL_MSG and THREADS consts.
   */
  @Test(enabled=false, groups="slow")
  public void multipleWriterMultipleReader() throws Exception
  {
    ChangelogBroker server = null;
    final int TOTAL_MSG = 1000;   // number of messages to send during the test
    final int THREADS = 2;       // number of threads that will produce
                               // and read the messages.
    BrokerWriter producer[] = new BrokerWriter[THREADS];
    BrokerReader reader[] = new BrokerReader[THREADS];
    try
    {
      /*
       * Start the producer threads.
       */
      for (int i =0; i< THREADS; i++)
      {
        short serverId = (short) (10+i);
        ChangeNumberGenerator gen =
          new ChangeNumberGenerator(serverId , (long) 0);
        ChangelogBroker broker =
          openChangelogSession( DN.decode("dc=example,dc=com"), serverId,
            100, changelogPort, 1000, 1000, 0, true);
        producer[i] = new BrokerWriter(broker, gen, TOTAL_MSG/THREADS);
        reader[i] = new BrokerReader(broker);
      }
      for (int i =0; i< THREADS; i++)
      {
        producer[i].start();
      }
      for (int i =0; i< THREADS; i++)
      {
        reader[i].start();
      }
      for (int i =0; i< THREADS; i++)
      {
        producer[i].join();
        reader[i].join();
      }
    }
    finally
    {
@@ -414,57 +517,11 @@
    }
  }
  /**
   * After the tests stop the changelog server.
   */
  @AfterClass()
  public void shutdown() throws Exception
  {
    if (changelog != null)
      changelog.shutdown();
  }
  /**
   * Continuously reads messages from a changelog broker until there is nothing
   * left. Count the number of received messages.
   */
  private class BrokerReader extends Thread
  {
    private ChangelogBroker broker;
    /**
     * Creates a new Stress Test Reader
     * @param broker
     */
    public BrokerReader(ChangelogBroker broker)
    {
      this.broker = broker;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void run()
    {
      // loop receiving messages until either we get a timeout
      // because there is nothing left or an error condition happens.
      try
      {
        while (true)
        {
          SynchronizationMessage msg = broker.receive();
          if (msg == null)
            break;
        }
      } catch (Exception e) {
      }
    }
  }
  /**
   * Chaining tests of the changelog code with 2 changelog servers involved
   * 2 tests are done here (itest=0 or itest=1)
   *
   *
   * Test 1
   * - Create changelog server 1
   * - Create changelog server 2 connected with changelog server 1
@@ -472,7 +529,7 @@
   * - Create and connect client 2 to changelog server 2
   * - Make client1 publish changes
   * - Check that client 2 receives the changes published by client 1
   *
   *
   * Test 2
   * - Create changelog server 1
   * - Create and connect client1 to changelog server 1
@@ -480,7 +537,7 @@
   * - Create changelog server 2 connected with changelog server 1
   * - Create and connect client 2 to changelog server 2
   * - Check that client 2 receives the changes published by client 1
   *
   *
   */
  @Test(enabled=true)
  public void changelogChaining() throws Exception
@@ -520,8 +577,8 @@
        String changelogLdif = "dn: cn=Changelog Server\n"
          + "objectClass: top\n"
          + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
          + "cn: Changelog Server\n"
          + "ds-cfg-changelog-port: " + changelogPorts[i] + "\n"
          + "cn: Changelog Server\n"
          + "ds-cfg-changelog-port: " + changelogPorts[i] + "\n"
          + "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n"
          + "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n"
          + "ds-cfg-window-size: 100" + "\n"
@@ -535,17 +592,17 @@
      try
      {
        // For itest=0, create and connect client1 to changelog1
        // For itest=0, create and connect client1 to changelog1
        //              and client2 to changelog2
        // For itest=1, only create and connect client1 to changelog1
        // For itest=1, only create and connect client1 to changelog1
        //              client2 will be created later
        broker1 = openChangelogSession(DN.decode("dc=example,dc=com"),
            (short) brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
             brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
        if (itest == 0)
        {
          broker2 = openChangelogSession(DN.decode("dc=example,dc=com"),
              (short) brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges);
             brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges);
        }
        // - Test messages between clients by publishing now
@@ -553,7 +610,7 @@
        // - Delete
        long time = TimeThread.getTime();
        int ts = 1;
        ChangeNumber cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
        ChangeNumber cn = new ChangeNumber(time, ts++, brokerIds[0]);
        DeleteMsg delMsg = new DeleteMsg("o=test"+itest+",dc=example,dc=com", cn, "uid");
        broker1.publish(delMsg);
@@ -566,7 +623,7 @@
            + "objectClass: top\n" + "objectClass: domain\n"
            + "entryUUID: 11111111-1111-1111-1111-111111111111\n");
        Entry entry = TestCaseUtils.entryFromLdifString(lentry);
        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
        cn = new ChangeNumber(time, ts++, brokerIds[0]);
        AddMsg addMsg = new AddMsg(cn, "o=test,dc=example,dc=com",
            user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry
            .getAttributes(), new ArrayList<Attribute>());
@@ -577,13 +634,13 @@
        Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
        List<Modification> mods = new ArrayList<Modification>();
        mods.add(mod1);
        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
        cn = new ChangeNumber(time, ts++, brokerIds[0]);
        ModifyMsg modMsg = new ModifyMsg(cn, DN
            .decode("o=test,dc=example,dc=com"), mods, "fakeuniqueid");
        broker1.publish(modMsg);
        // - ModifyDN
        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
        cn = new ChangeNumber(time, ts++, brokerIds[0]);
        ModifyDNOperation op = new ModifyDNOperation(connection, 1, 1, null, DN
            .decode("o=test,dc=example,dc=com"), RDN.decode("o=test2"), true,
            null);
@@ -598,9 +655,9 @@
          String changelogLdif = "dn: cn=Changelog Server\n"
            + "objectClass: top\n"
            + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
            + "cn: Changelog Server\n"
            + "cn: Changelog Server\n"
            + "ds-cfg-changelog-port: " + changelogPorts[1] + "\n"
            + "ds-cfg-changelog-server: localhost:" + changelogPorts[0] + "\n"
            + "ds-cfg-changelog-server: localhost:" + changelogPorts[0] + "\n"
            + "ds-cfg-changelog-server-id: " + changelogIds[1] + "\n";
          Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
          ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
@@ -608,7 +665,7 @@
          // Connect broker 2 to changelog2
          broker2 = openChangelogSession(DN.decode("dc=example,dc=com"),
              (short) brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
              brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
        }
        // - Check msg receives by broker, through changeLog2
@@ -658,7 +715,7 @@
          }
        }
        // Check that everything expected has been received
        assertTrue(ts == 1, "Broker2 did not receive the complete set of"
        assertTrue(ts == 1, "Broker2 did not receive the complete set of"
            + " expected messages: #msg received " + ts);
      }
      finally
@@ -674,4 +731,94 @@
      }
    }
  }
  /**
   * After the tests stop the changelog server.
   */
  @AfterClass()
  public void shutdown() throws Exception
  {
    if (changelog != null)
      changelog.shutdown();
  }
  /**
   * This class allows to creater reader thread.
   * They continuously reads messages from a changelog broker until
   * there is nothing left.
   * They Count the number of received messages.
   */
  private class BrokerReader extends Thread
  {
    private ChangelogBroker broker;
    /**
     * Creates a new Stress Test Reader
     * @param broker
     */
    public BrokerReader(ChangelogBroker broker)
    {
      this.broker = broker;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void run()
    {
      // loop receiving messages until either we get a timeout
      // because there is nothing left or an error condition happens.
      try
      {
        while (true)
        {
          SynchronizationMessage msg = broker.receive();
          if (msg == null)
            break;
          }
      } catch (Exception e) {
      }
    }
  }
  /**
   * This class allows to create writer thread that can
   * be used as producers for the Changelog stress tests.
   */
  private class BrokerWriter extends Thread
  {
    int count;
    private ChangelogBroker broker;
    ChangeNumberGenerator gen;
    public BrokerWriter(ChangelogBroker broker, ChangeNumberGenerator gen,
        int count)
    {
      this.broker = broker;
      this.count = count;
      this.gen = gen;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void run()
    {
      /*
       * Simple loop creating changes and sending them
       * to the changelog server.
       */
      while (count>0)
      {
        count--;
        DeleteMsg msg =
          new DeleteMsg("o=test,dc=example,dc=com", gen.NewChangeNumber(),
              "uid");
        broker.publish(msg);
      }
    }
  }
}