mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
06.49.2009 93048f8743d67e53bfa6ddc65fc4c5b1d867cd39
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -569,7 +569,7 @@
    ReplicationBroker server = null;
    BrokerReader reader = null;
    int TOTAL_MSG = 1000;     // number of messages to send during the test
    int CLIENT_THREADS = 2;   // number of threads that will try to read
    int CLIENT_THREADS = 3;   // number of threads that will try to read
                              // the messages
    ChangeNumberGenerator gen =
      new ChangeNumberGenerator((short)5 , (long) 0);
@@ -584,11 +584,11 @@
       */
      server = openReplicationSession(
          DN.decode(TEST_ROOT_DN_STRING), (short) 5, 100, replicationServerPort,
          1000, 1000, 0, true);
          100000, 1000, 0, false);
      assertTrue(server.isConnected());
      reader = new BrokerReader(server);
      reader = new BrokerReader(server, TOTAL_MSG);
      /*
       * Start the client threads.
@@ -599,7 +599,7 @@
            DN.decode(TEST_ROOT_DN_STRING), (short) (100+i), 100, replicationServerPort,
            1000, true);
        assertTrue(clientBroker[i].isConnected());
        client[i] = new BrokerReader(clientBroker[i]);
        client[i] = new BrokerReader(clientBroker[i], TOTAL_MSG);
      }
      for (int i =0; i< CLIENT_THREADS; i++)
@@ -624,13 +624,21 @@
    finally
    {
      if (reader != null)
        reader.join();
      {
        reader.join(10000);
      }
      if (server != null)
      {
        server.stop();
      }
      for (int i =0; i< CLIENT_THREADS; i++)
      {
        if (client[i] != null)
          client[i].join();
        {
          client[i].join(10000);
          client[i].interrupt();
        }
      }
      for (int i =0; i< CLIENT_THREADS; i++)
      {
@@ -681,12 +689,12 @@
          new ChangeNumberGenerator(serverId , (long) 0);
        broker[i] =
          openReplicationSession( DN.decode(TEST_ROOT_DN_STRING), serverId,
            100, replicationServerPort, 1000, 1000, 0, true);
            100, replicationServerPort, 100000, 1000, 0, false);
        assertTrue(broker[i].isConnected());
        producer[i] = new BrokerWriter(broker[i], gen, TOTAL_MSG/THREADS);
        reader[i] = new BrokerReader(broker[i]);
        reader[i] = new BrokerReader(broker[i], (TOTAL_MSG/THREADS)*(THREADS-1));
      }
      for (int i =0; i< THREADS; i++)
@@ -705,12 +713,16 @@
      for (int i = 0; i< THREADS; i++)
      {
        if (producer[i] != null)
          producer[i].join();
          producer[i].join(10000);
        // kill the thread in case it is not yet stopped.
        producer[i].interrupt();
      }
      for (int i = 0; i< THREADS; i++)
      {
        if (reader[i] != null)
          reader[i].join();
          reader[i].join(10000);
        // kill the thread in case it is not yet stopped.
        reader[i].interrupt();
      }
      for (int i = 0; i< THREADS; i++)
      {
@@ -1112,14 +1124,16 @@
  private class BrokerReader extends Thread
  {
    private ReplicationBroker broker;
    private int numMsgRcv = 0;
    private final int numMsgExpected;
    /**
     * Creates a new Stress Test Reader
     * @param broker
     */
    public BrokerReader(ReplicationBroker broker)
    public BrokerReader(ReplicationBroker broker, int numMsgExpected)
    {
      this.broker = broker;
      this.numMsgExpected = numMsgExpected;
    }
    /**
@@ -1135,11 +1149,23 @@
        while (true)
        {
          ReplicationMsg msg = broker.receive();
          broker.updateWindowAfterReplay();
          if (msg == null)
          if (msg instanceof UpdateMsg)
          {
            numMsgRcv++;
            broker.updateWindowAfterReplay();
          }
          if ((msg == null) || (numMsgRcv >= numMsgExpected))
            break;
          }
      } catch (Exception e) {
      } catch (SocketTimeoutException e)
      {
        assertTrue((numMsgRcv == numMsgExpected),
            "a BrokerReader did not received the expected message number :"
            + numMsgRcv + " " + numMsgExpected);
      } catch (Exception e)
      {
        assertTrue(false,
            "a BrokerReader received an Exception" + e.getMessage());
      }
    }
  }
@@ -1294,7 +1320,6 @@
   private Entry createExportAllTask()
   throws Exception
   {
     String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
     String path = "exportLDIF.ldif";
     return TestCaseUtils.makeEntry(
     "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks",
@@ -1311,7 +1336,6 @@
   throws Exception
   {
     String root = suffix.substring(suffix.indexOf('=')+1, suffix.indexOf(','));
     String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
     String path = "exportLDIF" + root +".ldif";
     return TestCaseUtils.makeEntry(
     "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks",