From 26ff1f0755680cbce7b5bdb136750b2b1bc9e4ed Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 10 Nov 2006 08:05:56 +0000
Subject: [PATCH] issue 508  These changes implement a window mechanism in the sycnhronization protocol.

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java |  153 +++++++++++++++++++++++++++------------------------
 1 files changed, 81 insertions(+), 72 deletions(-)

diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
index d553104..f73ca4b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
@@ -27,6 +27,7 @@
 
 package org.opends.server.synchronization;
 
+import static org.opends.server.loggers.Error.logError;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
@@ -53,6 +54,8 @@
 import org.opends.server.types.AttributeValue;
 import org.opends.server.types.DN;
 import org.opends.server.types.Entry;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
 import org.opends.server.types.InitializationException;
 import org.opends.server.types.Modification;
 import org.opends.server.types.ModificationType;
@@ -139,96 +142,100 @@
   @Test(enabled=true, groups="slow")
   public void fromServertoBroker() throws Exception
   {
-
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+        ErrorLogSeverity.NOTICE,
+        "Starting Synchronization StressTest : fromServertoBroker" , 1);
+    
     final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
     final int TOTAL_MESSAGES = 1000;
     cleanEntries();
 
-    ChangelogBroker broker = openChangelogSession(baseDn, (short) 3);
+    ChangelogBroker broker = openChangelogSession(baseDn, (short) 18);
     DirectoryServer.registerMonitorProvider(this);
 
     try {
-    /*
-     * loop receiving update until there is nothing left
-     * to make sure that message from previous tests have been consumed.
-     */
-    try
-    {
-      while (true)
+      /*
+       * loop receiving update until there is nothing left
+       * to make sure that message from previous tests have been consumed.
+       */
+      try
       {
-        broker.receive();
+        while (true)
+        {
+          broker.receive();
+        }
       }
-     }
-    catch (Exception e)
-    { }
-    /*
-     * Test that operations done on this server are sent to the
-     * changelog server and forwarded to our changelog broker session.
-     */
+      catch (Exception e)
+      { }
+      /*
+       * Test that operations done on this server are sent to the
+       * changelog server and forwarded to our changelog broker session.
+       */
 
-    // Create an Entry (add operation) that will be later used in the test.
-    Entry tmp = personEntry.duplicate();
-    AddOperation addOp = new AddOperation(connection,
-        InternalClientConnection.nextOperationID(), InternalClientConnection
-        .nextMessageID(), null, tmp.getDN(),
-        tmp.getObjectClasses(), tmp.getUserAttributes(),
-        tmp.getOperationalAttributes());
-    addOp.run();
-    entryList.add(personEntry);
-    assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
-      "The Add Entry operation failed");
+      // Create an Entry (add operation) that will be later used in the test.
+      Entry tmp = personEntry.duplicate();
+      AddOperation addOp = new AddOperation(connection,
+          InternalClientConnection.nextOperationID(), InternalClientConnection
+          .nextMessageID(), null, tmp.getDN(),
+          tmp.getObjectClasses(), tmp.getUserAttributes(),
+          tmp.getOperationalAttributes());
+      addOp.run();
+      entryList.add(personEntry);
+      assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
+        "The Add Entry operation failed");
 
-    // Check if the client has received the msg
-    SynchronizationMessage msg = broker.receive();
-    assertTrue(msg instanceof AddMsg,
-      "The received synchronization message is not an ADD msg");
-    AddMsg addMsg =  (AddMsg) msg;
+      // Check if the client has received the msg
+      SynchronizationMessage msg = broker.receive();
+      assertTrue(msg instanceof AddMsg,
+        "The received synchronization message is not an ADD msg");
+      AddMsg addMsg =  (AddMsg) msg;
 
-    Operation receivedOp = addMsg.createOperation(connection);
-    assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
-      "The received synchronization message is not an ADD msg");
+      Operation receivedOp = addMsg.createOperation(connection);
+      assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
+        "The received synchronization message is not an ADD msg");
 
-    assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
-      "The received ADD synchronization message is not for the excepted DN");
+      assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
+        "The received ADD synchronization message is not for the excepted DN");
 
-    reader = new BrokerReader(broker);
-    reader.start();
+      reader = new BrokerReader(broker);
+      reader.start();
 
-    long startTime = TimeThread.getTime();
-    int count = TOTAL_MESSAGES;
+      long startTime = TimeThread.getTime();
+      int count = TOTAL_MESSAGES;
 
-    // Create a number of writer thread that will loop modifying the entry
-    List<Thread> writerThreadList = new LinkedList<Thread>();
-    for (int n = 0; n < 1; n++)
-    {
-      BrokerWriter writer = new BrokerWriter(count);
-      writerThreadList.add(writer);
-    }
-    for (Thread thread : writerThreadList)
-    {
-      thread.start();
-    }
-    // wait for all the threads to finish.
-    for (Thread thread : writerThreadList)
-    {
-      thread.join();
-    }
+      // Create a number of writer thread that will loop modifying the entry
+      List<Thread> writerThreadList = new LinkedList<Thread>();
+      for (int n = 0; n < 1; n++)
+      {
+        BrokerWriter writer = new BrokerWriter(count);
+        writerThreadList.add(writer);
+      }
+      for (Thread thread : writerThreadList)
+      {
+        thread.start();
+      }
+      // wait for all the threads to finish.
+      for (Thread thread : writerThreadList)
+      {
+        thread.join();
+      }
 
-    long afterSendTime = TimeThread.getTime();
+      long afterSendTime = TimeThread.getTime();
 
-    int rcvCount = reader.getCount();
-    long afterReceiveTime = TimeThread.getTime();
+      int rcvCount = reader.getCount();
+      
+      long afterReceiveTime = TimeThread.getTime();
 
-    if (rcvCount != TOTAL_MESSAGES)
-    {
-      fail("some messages were lost : expected : " +TOTAL_MESSAGES +
-           " received : " + rcvCount);
-    }
+      if (rcvCount != TOTAL_MESSAGES)
+      {
+        fail("some messages were lost : expected : " +TOTAL_MESSAGES +
+            " received : " + rcvCount);
+      }
 
     }
     finally {
-    DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST);
-    broker.stop();
+      DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST);
+      broker.stop();
     }
   }
 
@@ -393,7 +400,7 @@
     ServerState state = new ServerState(baseDn);
     state.loadState();
     ChangelogBroker broker = new ChangelogBroker(state, baseDn,
-                                                 serverId, 0, 0, 0, 0);
+                                                 serverId, 0, 0, 0, 0, 100);
     ArrayList<String> servers = new ArrayList<String>(1);
     servers.add("localhost:8989");
     broker.start(servers);
@@ -441,7 +448,7 @@
     // We also have a replicated suffix (synchronization domain)
     DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
     assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
-        "Unable to add the syncrhonized server");
+        "Unable to add the synchronized server");
     entryList.add(synchroServerEntry);
   }
 
@@ -553,7 +560,9 @@
       {
         while (true)
         {
-          broker.receive();
+          SynchronizationMessage msg = broker.receive();
+          if (msg == null)
+            break;
           count ++;
         }
       } catch (Exception e) {
@@ -577,7 +586,7 @@
           return count;
         try
         {
-          this.wait();
+          this.wait(60);
           return count;
         } catch (InterruptedException e)
         {

--
Gitblit v1.10.0