From 97c337fae8aa9247da4fd9ea3d7b9974887eb124 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 29 Jan 2007 17:21:53 +0000
Subject: [PATCH] - Fix issue 1160 : synchronization is flushing the msgQueue to the database too often The synchronization server is flushing all the queues of the messages received from a LDAP server each time a server needs to retrieve some changes that are not in memory anymore.

---
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java                             |    2 
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java                             |   35 ++++
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java |   39 +++++
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java |  287 +++++++++++++++++++++++++++++++----------
 4 files changed, 288 insertions(+), 75 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
index 4761d06..2dd214f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -328,7 +328,7 @@
       List<String> unacceptableReasons)
   {
     // TODO NYI
-    return false;
+    return true;
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
index 0fb35e6..90132ac 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
@@ -35,6 +35,7 @@
 import java.util.Date;
 import java.util.List;
 import java.util.LinkedList;
+import java.util.NoSuchElementException;
 
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.api.MonitorProvider;
@@ -206,11 +207,37 @@
                            throws DatabaseException, Exception
   {
     /*
-     * make sure to flush some changes in the database so that
-     * we don't create the iterator on an empty database when the
-     * dbHandler has just been started.
+     * When we create an iterator we need to make sure that we
+     * don't miss some changes because the iterator is created
+     * close to the limit of the changed that have not yet been
+     * flushed to the database.
+     * We detect this by comparing the date of the changeNumber where
+     * we want to start with the date of the first ChangeNumber
+     * of the msgQueue.
+     * If this is the case we flush the queue to the database.
      */
-    flush();
+    ChangeNumber recentChangeNumber = null;
+
+    if (changeNumber == null)
+      flush();
+
+    synchronized (msgQueue)
+    {
+      try
+      {
+        UpdateMessage msg = msgQueue.getFirst();
+        recentChangeNumber = msg.getChangeNumber();
+      }
+      catch (NoSuchElementException e)
+      {}
+    }
+
+    if ( (recentChangeNumber != null) &&
+         (recentChangeNumber.getTimeSec() - changeNumber.getTimeSec() < 2))
+    {
+      flush();
+    }
+
     return new ChangelogIterator(serverId, db, changeNumber);
   }
 
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
index fec54b9..1c59158 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -164,6 +164,45 @@
 
     return broker;
   }
+  
+  /**
+   * Open a changelog session with flow control to the local Changelog server.
+   *
+   */
+  protected ChangelogBroker openChangelogSession(
+      final DN baseDn, short serverId, int window_size,
+      int port, int timeout, int maxSendQueue, int maxRcvQueue,
+      boolean emptyOldChanges)
+          throws Exception, SocketException
+  {
+    PersistentServerState state = new PersistentServerState(baseDn);
+    if (emptyOldChanges)
+      state.loadState();
+    ChangelogBroker broker = new ChangelogBroker(
+        state, baseDn, serverId, maxRcvQueue, 0, maxSendQueue, 0, window_size);
+    ArrayList<String> servers = new ArrayList<String>(1);
+    servers.add("localhost:" + port);
+    broker.start(servers);
+    if (timeout != 0)
+      broker.setSoTimeout(timeout);
+    if (emptyOldChanges)
+    {
+      /*
+       * loop receiving update until there is nothing left
+       * to make sure that message from previous tests have been consumed.
+       */
+      try
+      {
+        while (true)
+        {
+          broker.receive();
+        }
+      }
+      catch (Exception e)
+      { }
+    }
+    return broker;
+  }
 
   /**
    * suppress all the entries created by the tests in this class
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
index 9639cea..743aa4a 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -26,10 +26,11 @@
  */
 package org.opends.server.synchronization.changelog;
 
-import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import static org.opends.server.synchronization.protocol.OperationContext.*;
+
 import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.List;
@@ -375,11 +376,25 @@
   /**
    * Stress test from client using the ChangelogBroker API
    * to the changelog server.
+   * This test allow to investigate the behaviour of the
+   * Changelog server when it needs to distribute the load of
+   * updates from a single LDAP server to a number of LDAP servers.
+   *
+   * This test i sconfigured by a relatively low stress
+   * but can be changed using TOTAL_MSG and CLIENT_THREADS consts.
    */
-  @Test(enabled=false, groups="slow")
-  public void stressFromBrokertoChangelog() throws Exception
+  @Test(enabled=true, groups="slow")
+  public void oneWriterMultipleReader() throws Exception
   {
     ChangelogBroker server = null;
+    int TOTAL_MSG = 1000;     // number of messages to send during the test
+    int CLIENT_THREADS = 2;   // number of threads that will try to read
+                              // the messages
+    ChangeNumberGenerator gen =
+      new ChangeNumberGenerator((short)5 , (long) 0);
+
+    BrokerReader client[] = new BrokerReader[CLIENT_THREADS];
+    ChangelogBroker clientBroker[] = new ChangelogBroker[CLIENT_THREADS];
 
     try
     {
@@ -388,24 +403,112 @@
        */
       server = openChangelogSession(
           DN.decode("dc=example,dc=com"), (short) 5, 100, changelogPort,
-          1000, true);
+          1000, 1000, 0, true);
 
       BrokerReader reader = new BrokerReader(server);
+
+      /*
+       * Start the client threads.
+       */
+      for (int i =0; i< CLIENT_THREADS; i++)
+      {
+        clientBroker[i] = openChangelogSession(
+            DN.decode("dc=example,dc=com"), (short) (100+i), 100, changelogPort,
+            1000, true);
+        client[i] = new BrokerReader(clientBroker[i]);
+      }
+
+      for (int i =0; i< CLIENT_THREADS; i++)
+      {
+        client[i].start();
+      }
       reader.start();
 
-      ChangeNumberGenerator gen =
-        new ChangeNumberGenerator((short)5 , (long) 0);
       /*
        * Simple loop creating changes and sending them
        * to the changelog server.
        */
-      for (int i = 0; i< 100000; i++)
+      for (int i = 0; i< TOTAL_MSG; i++)
       {
         DeleteMsg msg =
           new DeleteMsg("o=test,dc=example,dc=com", gen.NewChangeNumber(),
           "uid");
         server.publish(msg);
       }
+
+      for (int i =0; i< CLIENT_THREADS; i++)
+      {
+        client[i].join();
+        reader.join();
+      }
+    }
+    finally
+    {
+      if (server != null)
+        server.stop();
+      for (int i =0; i< CLIENT_THREADS; i++)
+      {
+        clientBroker[i].stop();
+      }
+    }
+  }
+
+  /**
+   * Stress test from client using the ChangelogBroker API
+   * to the changelog server.
+   *
+   * This test allow to investigate the behaviour of the
+   * Changelog server when it needs to distribute the load of
+   * updates from multiple LDAP server to a number of LDAP servers.
+   *
+   * This test is sconfigured for a relatively low stress
+   * but can be changed using TOTAL_MSG and THREADS consts.
+   */
+  @Test(enabled=false, groups="slow")
+  public void multipleWriterMultipleReader() throws Exception
+  {
+    ChangelogBroker server = null;
+    final int TOTAL_MSG = 1000;   // number of messages to send during the test
+    final int THREADS = 2;       // number of threads that will produce
+                               // and read the messages.
+
+    BrokerWriter producer[] = new BrokerWriter[THREADS];
+    BrokerReader reader[] = new BrokerReader[THREADS];
+
+    try
+    {
+      /*
+       * Start the producer threads.
+       */
+      for (int i =0; i< THREADS; i++)
+      {
+        short serverId = (short) (10+i);
+        ChangeNumberGenerator gen =
+          new ChangeNumberGenerator(serverId , (long) 0);
+        ChangelogBroker broker =
+          openChangelogSession( DN.decode("dc=example,dc=com"), serverId,
+            100, changelogPort, 1000, 1000, 0, true);
+
+        producer[i] = new BrokerWriter(broker, gen, TOTAL_MSG/THREADS);
+        reader[i] = new BrokerReader(broker);
+
+      }
+
+      for (int i =0; i< THREADS; i++)
+      {
+        producer[i].start();
+      }
+
+      for (int i =0; i< THREADS; i++)
+      {
+        reader[i].start();
+      }
+
+      for (int i =0; i< THREADS; i++)
+      {
+        producer[i].join();
+        reader[i].join();
+      }
     }
     finally
     {
@@ -414,57 +517,11 @@
     }
   }
 
-  /**
-   * After the tests stop the changelog server.
-   */
-  @AfterClass()
-  public void shutdown() throws Exception
-  {
-    if (changelog != null)
-      changelog.shutdown();
-  }
-  /**
-   * Continuously reads messages from a changelog broker until there is nothing
-   * left. Count the number of received messages.
-   */
-  private class BrokerReader extends Thread
-  {
-    private ChangelogBroker broker;
 
-    /**
-     * Creates a new Stress Test Reader
-     * @param broker
-     */
-    public BrokerReader(ChangelogBroker broker)
-    {
-      this.broker = broker;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void run()
-    {
-      // loop receiving messages until either we get a timeout
-      // because there is nothing left or an error condition happens.
-      try
-      {
-        while (true)
-        {
-          SynchronizationMessage msg = broker.receive();
-          if (msg == null)
-            break;
-        }
-      } catch (Exception e) {
-      }
-    }
-  }
-  
   /**
    * Chaining tests of the changelog code with 2 changelog servers involved
    * 2 tests are done here (itest=0 or itest=1)
-   * 
+   *
    * Test 1
    * - Create changelog server 1
    * - Create changelog server 2 connected with changelog server 1
@@ -472,7 +529,7 @@
    * - Create and connect client 2 to changelog server 2
    * - Make client1 publish changes
    * - Check that client 2 receives the changes published by client 1
-   * 
+   *
    * Test 2
    * - Create changelog server 1
    * - Create and connect client1 to changelog server 1
@@ -480,7 +537,7 @@
    * - Create changelog server 2 connected with changelog server 1
    * - Create and connect client 2 to changelog server 2
    * - Check that client 2 receives the changes published by client 1
-   * 
+   *
    */
   @Test(enabled=true)
   public void changelogChaining() throws Exception
@@ -520,8 +577,8 @@
         String changelogLdif = "dn: cn=Changelog Server\n"
           + "objectClass: top\n"
           + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
-          + "cn: Changelog Server\n" 
-          + "ds-cfg-changelog-port: " + changelogPorts[i] + "\n" 
+          + "cn: Changelog Server\n"
+          + "ds-cfg-changelog-port: " + changelogPorts[i] + "\n"
           + "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n"
           + "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n"
           + "ds-cfg-window-size: 100" + "\n"
@@ -535,17 +592,17 @@
 
       try
       {
-        // For itest=0, create and connect client1 to changelog1 
+        // For itest=0, create and connect client1 to changelog1
         //              and client2 to changelog2
-        // For itest=1, only create and connect client1 to changelog1 
+        // For itest=1, only create and connect client1 to changelog1
         //              client2 will be created later
         broker1 = openChangelogSession(DN.decode("dc=example,dc=com"),
-            (short) brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
+             brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
 
         if (itest == 0)
         {
           broker2 = openChangelogSession(DN.decode("dc=example,dc=com"),
-              (short) brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges);
+             brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges);
         }
 
         // - Test messages between clients by publishing now
@@ -553,7 +610,7 @@
         // - Delete
         long time = TimeThread.getTime();
         int ts = 1;
-        ChangeNumber cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
+        ChangeNumber cn = new ChangeNumber(time, ts++, brokerIds[0]);
 
         DeleteMsg delMsg = new DeleteMsg("o=test"+itest+",dc=example,dc=com", cn, "uid");
         broker1.publish(delMsg);
@@ -566,7 +623,7 @@
             + "objectClass: top\n" + "objectClass: domain\n"
             + "entryUUID: 11111111-1111-1111-1111-111111111111\n");
         Entry entry = TestCaseUtils.entryFromLdifString(lentry);
-        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
+        cn = new ChangeNumber(time, ts++, brokerIds[0]);
         AddMsg addMsg = new AddMsg(cn, "o=test,dc=example,dc=com",
             user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry
             .getAttributes(), new ArrayList<Attribute>());
@@ -577,13 +634,13 @@
         Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
         List<Modification> mods = new ArrayList<Modification>();
         mods.add(mod1);
-        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
+        cn = new ChangeNumber(time, ts++, brokerIds[0]);
         ModifyMsg modMsg = new ModifyMsg(cn, DN
             .decode("o=test,dc=example,dc=com"), mods, "fakeuniqueid");
         broker1.publish(modMsg);
 
         // - ModifyDN
-        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
+        cn = new ChangeNumber(time, ts++, brokerIds[0]);
         ModifyDNOperation op = new ModifyDNOperation(connection, 1, 1, null, DN
             .decode("o=test,dc=example,dc=com"), RDN.decode("o=test2"), true,
             null);
@@ -598,9 +655,9 @@
           String changelogLdif = "dn: cn=Changelog Server\n"
             + "objectClass: top\n"
             + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
-            + "cn: Changelog Server\n" 
+            + "cn: Changelog Server\n"
             + "ds-cfg-changelog-port: " + changelogPorts[1] + "\n"
-            + "ds-cfg-changelog-server: localhost:" + changelogPorts[0] + "\n" 
+            + "ds-cfg-changelog-server: localhost:" + changelogPorts[0] + "\n"
             + "ds-cfg-changelog-server-id: " + changelogIds[1] + "\n";
           Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
           ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
@@ -608,7 +665,7 @@
 
           // Connect broker 2 to changelog2
           broker2 = openChangelogSession(DN.decode("dc=example,dc=com"),
-              (short) brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
+              brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
         }
 
         // - Check msg receives by broker, through changeLog2
@@ -658,7 +715,7 @@
           }
         }
         // Check that everything expected has been received
-        assertTrue(ts == 1, "Broker2 did not receive the complete set of" 
+        assertTrue(ts == 1, "Broker2 did not receive the complete set of"
             + " expected messages: #msg received " + ts);
       }
       finally
@@ -674,4 +731,94 @@
       }
     }
   }
+
+  /**
+   * After the tests stop the changelog server.
+   */
+  @AfterClass()
+  public void shutdown() throws Exception
+  {
+    if (changelog != null)
+      changelog.shutdown();
+  }
+
+  /**
+   * This class allows to creater reader thread.
+   * They continuously reads messages from a changelog broker until
+   * there is nothing left.
+   * They Count the number of received messages.
+   */
+  private class BrokerReader extends Thread
+  {
+    private ChangelogBroker broker;
+
+    /**
+     * Creates a new Stress Test Reader
+     * @param broker
+     */
+    public BrokerReader(ChangelogBroker broker)
+    {
+      this.broker = broker;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void run()
+    {
+      // loop receiving messages until either we get a timeout
+      // because there is nothing left or an error condition happens.
+      try
+      {
+        while (true)
+        {
+          SynchronizationMessage msg = broker.receive();
+          if (msg == null)
+            break;
+          }
+      } catch (Exception e) {
+      }
+    }
+  }
+
+  /**
+   * This class allows to create writer thread that can
+   * be used as producers for the Changelog stress tests.
+   */
+  private class BrokerWriter extends Thread
+  {
+    int count;
+    private ChangelogBroker broker;
+    ChangeNumberGenerator gen;
+
+    public BrokerWriter(ChangelogBroker broker, ChangeNumberGenerator gen,
+        int count)
+    {
+      this.broker = broker;
+      this.count = count;
+      this.gen = gen;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void run()
+    {
+      /*
+       * Simple loop creating changes and sending them
+       * to the changelog server.
+       */
+      while (count>0)
+      {
+        count--;
+
+        DeleteMsg msg =
+          new DeleteMsg("o=test,dc=example,dc=com", gen.NewChangeNumber(),
+              "uid");
+        broker.publish(msg);
+      }
+    }
+  }
 }

--
Gitblit v1.10.0