mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

jcduff
23.04.2008 f73b655466092169abac34833fb628fce1fcdebe
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -37,11 +37,13 @@
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.core.AddOperationBasis;
import org.opends.server.core.DeleteOperationBasis;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperation;
import org.opends.server.protocols.asn1.ASN1OctetString;
@@ -52,7 +54,7 @@
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.DN;
@@ -70,6 +72,7 @@
import org.opends.server.types.Attribute;
import static org.opends.server.TestCaseUtils.*;
import static org.testng.Assert.assertNotNull;
/**
 * Test the constructors, encoders and decoders of the Replication AckMsg,
@@ -88,12 +91,12 @@
   */
  protected Entry personEntry;
  private int replServerPort;
  // the base DN used for this test
  // the base DN used for this test
  private DN baseDn;
  private ReplicationServer replicationServer;
  /**
   * Test the window mechanism by :
   *  - creating a ReplicationServer service client using the ReplicationBroker class.
@@ -109,23 +112,37 @@
    logError(Message.raw(
        Category.SYNC, Severity.INFORMATION,
        "Starting Replication ProtocolWindowTest : saturateAndRestart"));
    // clear the Replication Server and the backend to isolate this test
    // from the other tests,
    TestCaseUtils.initializeTestBackend(true);
    replicationServer.clearDb();
    ReplicationBroker broker = openReplicationSession(baseDn, (short) 13,
    // suffix synchronized
    String testName = "protocolWindowTest";
    String synchroServerLdif =
      "dn: " + "cn=" + testName + ", cn=domains, " + SYNCHRO_PLUGIN_DN + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-replication-domain\n"
        + "cn: " + testName + "\n"
        + "ds-cfg-base-dn: " + TEST_ROOT_DN_STRING + "\n"
        + "ds-cfg-replication-server: localhost:" + replServerPort + "\n"
        + "ds-cfg-server-id: 1\n"
        + "ds-cfg-receive-status: true\n"
        + "ds-cfg-window-size: " + WINDOW_SIZE;
    Entry repDomainEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
    // Configure replication domain
    DirectoryServer.getConfigHandler().addEntry(repDomainEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(repDomainEntry.getDN()),
          "Unable to add the synchronized server");
    ReplicationBroker broker = openReplicationSession(baseDn, (short) 12,
        WINDOW_SIZE, replServerPort, 1000, true);
    try {
      /* Test that replicationServer monitor and synchro plugin monitor informations
       * publish the correct window size.
       * This allows both the check the monitoring code and to test that
       * This allows both to check the monitoring code and to test that
       * configuration is working.
       */
      Thread.sleep(1500);
      Thread.sleep(2000);
      assertTrue(checkWindows(WINDOW_SIZE));
      assertTrue(checkChangelogQueueSize(REPLICATION_QUEUE_SIZE));
@@ -138,12 +155,11 @@
          tmp.getOperationalAttributes());
      addOp.run();
      assertEquals(addOp.getResultCode(), ResultCode.SUCCESS);
      entryList.addLast(personEntry.getDN());
      assertTrue(DirectoryServer.entryExists(personEntry.getDN()),
        "The Add Entry operation failed");
      // Check if the client has received the MSG
      ReplicationMessage msg = broker.receive();
      ReplicationMsg msg = broker.receive();
      assertTrue(msg instanceof AddMsg,
        "The received Replication message is not an ADD msg");
      AddMsg addMsg =  (AddMsg) msg;
@@ -161,7 +177,7 @@
      processModify(count);
      // let some time to the message to reach the replicationServer client
      Thread.sleep(500);
      Thread.sleep(2000);
      // check that the replicationServer only sent WINDOW_SIZE messages
      searchUpdateSent();
@@ -186,6 +202,26 @@
    finally {
      broker.stop();
      DirectoryServer.deregisterMonitorProvider(REPLICATION_STRESS_TEST);
      // Clean domain
      DN dn = repDomainEntry.getDN();
      try
      {
        DeleteOperationBasis op = new DeleteOperationBasis(connection,
          InternalClientConnection.nextOperationID(),
          InternalClientConnection.nextMessageID(), null, dn);
        op.run();
        if ((op.getResultCode() != ResultCode.SUCCESS) &&
          (op.getResultCode() != ResultCode.NO_SUCH_OBJECT))
        {
          logError(Message.raw(Category.SYNC, Severity.NOTICE,
          "saturateQueueAndRestart: error cleaning config entry: " + dn));
        }
      } catch (NoSuchElementException e)
      {
        logError(Message.raw(Category.SYNC, Severity.NOTICE,
          "saturateQueueAndRestart: error cleaning config entry: " + dn));
      }
      replicationServer.clearDb();
    }
  }
@@ -206,7 +242,7 @@
  }
  /**
   * Check that the window configuration has been successfull
   * Check that the window configuration has been successful
   * by reading the monitoring information and checking
   * that we do have 2 entries with the configured max-rcv-window.
   */
@@ -234,7 +270,7 @@
        LDAPFilter.decode("(update-sent=" + WINDOW_SIZE + ")"));
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    assertEquals(op.getEntriesSent(), 1,
    assertEquals(op.getEntriesSent(), 1,
        "Entries#=" + op.getEntriesSent());
    op = connection.processSearch(
@@ -243,7 +279,7 @@
        LDAPFilter.decode("(missing-changes=" +
            (REPLICATION_QUEUE_SIZE + WINDOW_SIZE) + ")"));
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    Iterator<SearchResultEntry> entriesit = op.getSearchEntries().iterator();
    while(entriesit.hasNext())
    {
@@ -252,8 +288,8 @@
      while (attit.hasNext())
      {
        Attribute attr = attit.next();
        logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        e.getDN() + "= " + attr.getName() + " " + attr.getValues().iterator()
        logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        e.getDN() + "= " + attr.getName() + " " + attr.iterator()
        .next().getStringValue()));
      }
    }
@@ -271,20 +307,10 @@
  public void setUp() throws Exception
  {
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    super.setUp();
    baseDn = DN.decode(TEST_ROOT_DN_STRING);
    // Create an internal connection
    connection = InternalClientConnection.getRootConnection();
    // top level synchro provider
    String synchroStringDN = "cn=Synchronization Providers,cn=config";
    // Multimaster Synchro plugin
    synchroPluginStringDN = "cn=Multimaster Synchronization, "
        + synchroStringDN;
    // find  a free port for the replicationServer
    ServerSocket socket = TestCaseUtils.bindFreePort();
    replServerPort = socket.getLocalPort();
@@ -292,21 +318,8 @@
    // configure the replication Server.
    replicationServer = new ReplicationServer(new ReplServerFakeConfiguration(
        replServerPort, "changelogDbReplWindowTest", 0,
        replServerPort, "protocolWindowTestDb", 0,
        1, REPLICATION_QUEUE_SIZE, WINDOW_SIZE, null));
    // suffix synchronized
    String synchroServerLdif =
      "dn: " + "cn=example, cn=domains, " + synchroPluginStringDN + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-replication-domain\n"
        + "cn: example\n"
        + "ds-cfg-base-dn: " + TEST_ROOT_DN_STRING + "\n"
        + "ds-cfg-replication-server: localhost:" + replServerPort + "\n"
        + "ds-cfg-server-id: 1\n"
        + "ds-cfg-receive-status: true\n"
        + "ds-cfg-window-size: " + WINDOW_SIZE;
    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
    String personLdif = "dn: uid=user.windowTest," + TEST_ROOT_DN_STRING + "\n"
        + "objectClass: top\n" + "objectClass: person\n"
@@ -323,21 +336,6 @@
        + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
        + "userPassword: password\n" + "initials: AA\n";
    personEntry = TestCaseUtils.entryFromLdifString(personLdif);
    configureReplication();
  }
  /**
   * Clean up the environment. return null;
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @AfterClass
  public void classCleanUp() throws Exception
  {
    super.classCleanUp();
    replicationServer.shutdown();
  }
  private void processModify(int count)
@@ -362,37 +360,63 @@
        Category.SYNC, Severity.INFORMATION,
        "Starting Replication ProtocolWindowTest : protocolVersion"));
    // Test : Make a broker degrade its version when connecting to an old
    // replication server.
    ProtocolVersion.setCurrentVersion((short)2);
    ReplicationBroker broker = null;
    ReplicationBroker broker = new ReplicationBroker(
    try
    {
      // Test : Make a broker degrade its version when connecting to an old
      // replication server.
      ProtocolVersion.resetCurrentVersion();
      broker = new ReplicationBroker(null,
        new ServerState(),
        baseDn,
        (short) 13, 0, 0, 0, 0, 1000, 0,
        ReplicationTestCase.getGenerationId(baseDn),
        getReplSessionSecurity());
        getReplSessionSecurity(), (byte)1);
    // Check broker hard-coded version
    short pversion = broker.getProtocolVersion();
    assertEquals(pversion, 2);
      // Check broker hard-coded version
      short pversion = broker.getProtocolVersion();
      assertEquals(pversion, ProtocolVersion.getCurrentVersion());
    // Connect the broker to the replication server
    ProtocolVersion.setCurrentVersion((short)0);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + replServerPort);
    broker.start(servers);
    TestCaseUtils.sleep(100); // wait for connection established
      // Connect the broker to the replication server
      ProtocolVersion.setCurrentVersion(ProtocolVersion.REPLICATION_PROTOCOL_V1);
      ArrayList<String> servers = new ArrayList<String>(1);
      servers.add("localhost:" + replServerPort);
      broker.start(servers);
      TestCaseUtils.sleep(3000); // wait for connection established
    // Check broker negociated version
    pversion = broker.getProtocolVersion();
    assertEquals(pversion, 0);
      // Check broker negociated version
      pversion = broker.getProtocolVersion();
      assertEquals(pversion, ProtocolVersion.REPLICATION_PROTOCOL_V1);
    broker.stop();
    logError(Message.raw(
      logError(Message.raw(
        Category.SYNC, Severity.INFORMATION,
        "Ending Replication ProtocolWindowTest : protocolVersion"));
    } finally
    {
      if (broker != null)
        broker.stop();
      ProtocolVersion.resetCurrentVersion();
    }
  }
  /**
   * Clean up the environment.
   *
   * @throws Exception If the environment could not be set up.
   */
  @AfterClass
  @Override
  public void classCleanUp() throws Exception
  {
    callParanoiaCheck = false;
    super.classCleanUp();
    replicationServer.remove();
    paranoiaCheck();
  }
}