mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
03.59.2013 cbf6bfd149ce305652be0aac68d210778b5cbba6
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -28,10 +28,10 @@
package org.opends.server.replication;
import java.net.SocketTimeoutException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.assertj.core.api.Assertions;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
@@ -42,7 +42,6 @@
import org.opends.server.core.ModifyOperation;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
@@ -55,6 +54,7 @@
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.types.SearchScope.*;
import static org.testng.Assert.*;
/**
@@ -70,7 +70,7 @@
  /**
   * A "person" entry
   */
  protected Entry personEntry;
  private Entry personEntry;
  private int replServerPort;
@@ -112,7 +112,7 @@
    DirectoryServer.getConfigHandler().addEntry(repDomainEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(repDomainEntry.getDN()),
          "Unable to add the synchronized server");
    configEntryList.add(repDomainEntry.getDN());
    configEntriesToCleanup.add(repDomainEntry.getDN());
    ReplicationBroker broker = openReplicationSession(baseDN, 12,
        WINDOW_SIZE, replServerPort, 1000, true);
@@ -125,8 +125,8 @@
       * configuration is working.
       */
      Thread.sleep(2000);
      assertTrue(checkWindows(WINDOW_SIZE));
      assertTrue(checkChangelogQueueSize(REPLICATION_QUEUE_SIZE));
      assertEquals(checkWindows(WINDOW_SIZE), 3);
      assertEquals(checkChangelogQueueSize(REPLICATION_QUEUE_SIZE), 2);
      // Create an Entry (add operation) that will be later used in the test.
      Entry tmp = personEntry.duplicate(false);
@@ -142,8 +142,7 @@
      // Check if the client has received the MSG
      ReplicationMsg msg = broker.receive();
      assertTrue(msg instanceof AddMsg,
        "The received Replication message is not an ADD msg");
      Assertions.assertThat(msg).isInstanceOf(AddMsg.class);
      AddMsg addMsg =  (AddMsg) msg;
      Operation receivedOp = addMsg.createOperation(connection);
@@ -206,20 +205,20 @@
    }
  }
  private int searchNbMonitorEntries(String filterString) throws Exception
  {
    InternalSearchOperation op = connection.processSearch("cn=monitor", WHOLE_SUBTREE, filterString);
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    return op.getEntriesSent();
  }
  /**
   * Check that the ReplicationServer queue size has correctly been configured
   * by reading the monitoring information.
   * @throws LDAPException
   */
  private boolean checkChangelogQueueSize(int changelog_queue_size)
          throws LDAPException
  private int checkChangelogQueueSize(int changelog_queue_size) throws Exception
  {
    InternalSearchOperation op = connection.processSearch(
        ByteString.valueOf("cn=monitor"),
        SearchScope.WHOLE_SUBTREE, LDAPFilter.decode(
            "(max-waiting-changes=" +  changelog_queue_size + ")"));
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    return (op.getEntriesSent() == 2);
    return searchNbMonitorEntries("(max-waiting-changes=" + changelog_queue_size + ")");
  }
  /**
@@ -227,14 +226,9 @@
   * by reading the monitoring information and checking
   * that we do have 2 entries with the configured max-rcv-window.
   */
  private boolean checkWindows(int windowSize) throws LDAPException
  private int checkWindows(int windowSize) throws Exception
  {
    InternalSearchOperation op = connection.processSearch(
        ByteString.valueOf("cn=monitor"),
        SearchScope.WHOLE_SUBTREE,
        LDAPFilter.decode("(max-rcv-window=" + windowSize + ")"));
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    return (op.getEntriesSent() == 3);
    return searchNbMonitorEntries("(max-rcv-window=" + windowSize + ")");
  }
  /**
@@ -245,36 +239,11 @@
   */
  private void searchUpdateSent() throws Exception
  {
    InternalSearchOperation op = connection.processSearch(
        ByteString.valueOf("cn=monitor"),
        SearchScope.WHOLE_SUBTREE,
        LDAPFilter.decode("(sent-updates=" + WINDOW_SIZE + ")"));
    assertEquals(searchNbMonitorEntries("(sent-updates=" + WINDOW_SIZE + ")"), 1);
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    assertEquals(op.getEntriesSent(), 1,
        "Entries#=" + op.getEntriesSent());
    op = connection.processSearch(
        ByteString.valueOf("cn=monitor"),
        SearchScope.WHOLE_SUBTREE,
        LDAPFilter.decode("(missing-changes=" +
            (REPLICATION_QUEUE_SIZE + WINDOW_SIZE) + ")"));
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    Iterator<SearchResultEntry> entriesit = op.getSearchEntries().iterator();
    while(entriesit.hasNext())
    {
      SearchResultEntry e = entriesit.next();
      Iterator<Attribute> attit = e.getAttributes().iterator();
      while (attit.hasNext())
      {
        Attribute attr = attit.next();
        logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        e.getDN() + "= " + attr.getName() + " " + attr.iterator()
        .next().getValue().toString()));
      }
    }
    assertEquals(op.getEntriesSent(), 1, "Entries#=" + op.getEntriesSent());
    final int nb = searchNbMonitorEntries(
        "(missing-changes=" + (REPLICATION_QUEUE_SIZE + WINDOW_SIZE) + ")");
    assertEquals(nb, 1);
  }
  /**