mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noël Rouvignac
24.31.2015 639a952f54128a6a0675eb7b95cd4b277ea338d6
opendj-server-legacy/src/test/java/org/opends/server/replication/server/ReplicationServerTest.java
@@ -22,18 +22,22 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2015 ForgeRock AS
 *      Portions Copyright 2011-2016 ForgeRock AS
 */
package org.opends.server.replication.server;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import org.assertj.core.api.Assertions;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.ldap.ModificationType;
import org.forgerock.opendj.config.server.ConfigException;
import org.opends.server.TestCaseUtils;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.core.DirectoryServer;
@@ -47,23 +51,45 @@
import org.opends.server.replication.plugin.DummyReplicationDomain;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.plugin.ReplicationServerListener;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyDnContext;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ServerStartMsg;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.StartSessionMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
import org.opends.server.replication.protocol.WindowProbeMsg;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.types.*;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryConfig;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.HostPort;
import org.opends.server.types.Modification;
import org.opends.server.types.RDN;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.forgerock.opendj.ldap.ModificationType.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.replication.protocol.OperationContext.*;
import static org.opends.server.util.CollectionUtils.*;
import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
/**
 * Tests for the replicationServer code.
 */
/** Tests for the replicationServer code. */
@SuppressWarnings("javadoc")
public class ReplicationServerTest extends ReplicationTestCase
{
@@ -84,13 +110,7 @@
  private CSN unknownCSNServer1;
  /**
   * Set up the environment for performing the tests in this Class.
   * Replication
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  /** Set up the environment for performing the tests in this Class. */
  @BeforeClass
  @Override
  public void setUp() throws Exception
@@ -103,9 +123,7 @@
    configure();
  }
  /**
   * Start the server and configure a replicationServer.
   */
  /** Start the server and configure a replicationServer. */
  private void configure() throws Exception
  {
    replicationServerPort = TestCaseUtils.findFreePort();
@@ -130,6 +148,7 @@
        }
      }
    }
    assertNotNull(replicationServer);
  }
  private void debugInfo(String s)
@@ -285,7 +304,7 @@
    if (csn == null) {
      return "";
    }
    if (csn.equals(firstCSNServer1))
    else if (csn.equals(firstCSNServer1))
    {
      return "firstCSNServer1";
    }
@@ -318,10 +337,7 @@
    return state;
  }
  /**
   * Test that a new client see the change that was sent in the
   * previous test.
   */
  /** Test that a new client see the change that was sent in the previous test. */
  private void newClient() throws Exception
  {
    debugInfo("Starting newClient");
@@ -341,10 +357,7 @@
    }
  }
  /**
   * Test that a client that has already seen some changes now receive
   * the correct next change.
   */
  /** Test that a client that has already seen some changes now receive the correct next change. */
  private void newClientWithChanges(ServerState state, CSN nextCSN) throws Exception
  {
    ReplicationBroker broker = null;
@@ -367,9 +380,7 @@
    }
  }
  /**
   * Asserts that the CSN for the passed in message matches the supplied CSN.
   */
  /** Asserts that the CSN for the passed in message matches the supplied CSN. */
  private void assertDeleteMsgCSNEquals(ReplicationMsg msg, CSN nextCSN, String msgNumber)
  {
    Assertions.assertThat(msg).isInstanceOf(DeleteMsg.class);
@@ -378,10 +389,7 @@
        + " message received by a new client was the wrong one.");
  }
  /**
   * Test that a client that has already seen the first change now see the
   * second change.
   */
  /** Test that a client that has already seen the first change now see the second change. */
  private void newClientWithFirstChanges() throws Exception
  {
    debugInfo("Starting newClientWithFirstChanges");
@@ -394,10 +402,7 @@
    debugInfo("Ending newClientWithFirstChanges");
  }
  /**
   * Test with a client that has already seen a Change that the
   * ReplicationServer has not seen.
   */
  /** Test with a client that has already seen a Change that the ReplicationServer has not seen. */
  private void newClientWithUnknownChanges() throws Exception
  {
    debugInfo("Starting newClientWithUnknownChanges");
@@ -419,7 +424,7 @@
  }
  /**
   * Test that a client that has already seen the first chaneg from server 2
   * Test that a client that has already seen the first change from server 2
   * now see the first change from server 1.
   */
  private void newClientWithChangefromServer2() throws Exception
@@ -430,10 +435,7 @@
    debugInfo("Ending newClientWithChangefromServer2");
  }
  /**
   * Test that a client that has not seen the second change from server 1
   * now receive it.
   */
  /** Test that a client that has not seen the second change from server 1 now receive it. */
  private void newClientLateServer1() throws Exception
  {
    debugInfo("Starting newClientLateServer1");
@@ -495,15 +497,11 @@
    try
    {
      /*
       * Open a sender session
       */
      /* Open a sender session */
      server = openReplicationSession(TEST_ROOT_DN, 5, 100, replicationServerPort, 100000);
      reader = new BrokerReader(server, TOTAL_MSG);
      /*
       * Start the client threads.
       */
      /* Start the client threads. */
      for (int i =0; i< CLIENT_THREADS; i++)
      {
        clientBroker[i] = openReplicationSession(TEST_ROOT_DN, 100+i, 100, replicationServerPort, 1000);
@@ -516,10 +514,7 @@
      }
      reader.start();
      /*
       * Simple loop creating changes and sending them
       * to the replicationServer.
       */
      /* Simple loop creating changes and sending them to the replicationServer. */
      for (int i = 0; i< TOTAL_MSG; i++)
      {
        server.publish(new DeleteMsg(EXAMPLE_DN, gen.newCSN(), "uid"));
@@ -571,9 +566,7 @@
    try
    {
      /*
       * Start the producer threads.
       */
      /* Start the producer threads. */
      for (int i = 0; i< THREADS; i++)
      {
        int serverId = 10 + i;
@@ -657,18 +650,10 @@
      int[] changelogIds = new int[] { 80, 81 };
      int[] brokerIds = new int[] { 100, 101 };
      // create the 2 connected replicationServer
      for (int i = 0; i < 2; i++)
      {
        changelogs[i] = null;
        // create the 2 connected replicationServer
        SortedSet<String> servers = new TreeSet<>();
        servers.add(
          "localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]));
        ReplServerFakeConfiguration conf =
          new ReplServerFakeConfiguration(changelogPorts[i], "replicationServerTestChangelogChainingDb"+i,
              0, changelogIds[i], 0, 100, servers);
        changelogs[i] = new ReplicationServer(conf);
        changelogs[i] = newReplicationServer(changelogPorts, changelogIds, i);
      }
      ReplicationBroker broker1 = null;
@@ -680,39 +665,23 @@
        broker2 = openReplicationSession(TEST_ROOT_DN, brokerIds[1], 100, changelogPorts[0], 1000);
        // - Test messages between clients by publishing now
        CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime());
        // - Delete
        CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime());
        DN dn = DN.valueOf("o=example" + 0 + "," + TEST_ROOT_DN_STRING);
        DeleteMsg delMsg = new DeleteMsg(dn, csnGen.newCSN(), "uid");
        broker1.publish(delMsg);
        String user1entryUUID = "33333333-3333-3333-3333-333333333333";
        String baseUUID = "22222222-2222-2222-2222-222222222222";
        // - Add
        Entry entry = TestCaseUtils.entryFromLdifString(
        "dn: o=example," + TEST_ROOT_DN_STRING + "\n"
            + "objectClass: top\n" + "objectClass: domain\n"
            + "entryUUID: 11111111-1111-1111-1111-111111111111\n");
        AddMsg addMsg = new AddMsg(csnGen.newCSN(), EXAMPLE_DN,
            user1entryUUID, baseUUID, entry.getObjectClassAttribute(),
            entry.getAttributes(), new ArrayList<Attribute>());
        AddMsg addMsg = addMsg(csnGen);
        broker1.publish(addMsg);
        // - Modify
        Attribute attr1 = Attributes.create("description", "new value");
        List<Modification> mods =
            Arrays.asList(new Modification(ModificationType.REPLACE, attr1));
        ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
        ModifyMsg modMsg = modMsg(csnGen);
        broker1.publish(modMsg);
        // - ModifyDN
        ModifyDNOperationBasis op = new ModifyDNOperationBasis(connection, 1, 1, null,
            EXAMPLE_DN, RDN.decode("o=example2"), true, null);
        op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csnGen.newCSN(), "uniqueid", "newparentId"));
        LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op);
        ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp);
        ModifyDNMsg modDNMsg = modDNMsg(csnGen);
        broker1.publish(modDNMsg);
        // - Check msg receives by broker, through changeLog2
@@ -736,7 +705,7 @@
   * <li>Create replication server 2 connected with replication server 1</li>
   * <li>Create and connect client 2 to replication server 2</li>
   * <li>Check that client 2 receives the changes published by client 1</li>
   * <ol>.
   * <ol>
   */
  @Test(enabled = true)
  public void changelogChaining1() throws Exception
@@ -755,15 +724,8 @@
      int[] changelogIds = new int[] { 80, 81 };
      int[] brokerIds = new int[] { 100, 101 };
      {
        // create the 1rst replicationServer, the second one will be created later
        SortedSet<String> servers = new TreeSet<>();
        servers.add("localhost:" + changelogPorts[1]);
        ReplServerFakeConfiguration conf =
            new ReplServerFakeConfiguration(changelogPorts[0], "replicationServerTestChangelogChainingDb" + 0,
                                         0, changelogIds[0], 0, 100, servers);
        changelogs[0] = new ReplicationServer(conf);
      }
      // create the 1rst replicationServer, the second one will be created later
      changelogs[0] = newReplicationServer(changelogPorts, changelogIds, 0);
      ReplicationBroker broker1 = null;
@@ -773,47 +735,26 @@
        broker1 = openReplicationSession(TEST_ROOT_DN, brokerIds[0], 100, changelogPorts[0], 1000);
        // - Test messages between clients by publishing now
        // - Delete
        CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime());
        // - Delete
        DN dn = DN.valueOf("o=example" + 1 + "," + TEST_ROOT_DN_STRING);
        DeleteMsg delMsg = new DeleteMsg(dn, csnGen.newCSN(), "uid");
        broker1.publish(delMsg);
        String user1entryUUID = "33333333-3333-3333-3333-333333333333";
        String baseUUID = "22222222-2222-2222-2222-222222222222";
        // - Add
        String lentry = "dn: o=example," + TEST_ROOT_DN_STRING + "\n"
            + "objectClass: top\n" + "objectClass: domain\n"
            + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
        Entry entry = TestCaseUtils.entryFromLdifString(lentry);
        AddMsg addMsg = new AddMsg(csnGen.newCSN(), EXAMPLE_DN,
            user1entryUUID, baseUUID, entry.getObjectClassAttribute(),
            entry.getAttributes(), new ArrayList<Attribute>());
        AddMsg addMsg = addMsg(csnGen);
        broker1.publish(addMsg);
        // - Modify
        Attribute attr1 = Attributes.create("description", "new value");
        List<Modification> mods =
            Arrays.asList(new Modification(ModificationType.REPLACE, attr1));
        ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
        ModifyMsg modMsg = modMsg(csnGen);
        broker1.publish(modMsg);
        // - ModifyDN
        ModifyDNOperationBasis op = new ModifyDNOperationBasis(connection, 1, 1, null,
            EXAMPLE_DN, RDN.decode("o=example2"), true, null);
        op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csnGen.newCSN(), "uniqueid", "newparentId"));
        LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op);
        ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp);
        ModifyDNMsg modDNMsg = modDNMsg(csnGen);
        broker1.publish(modDNMsg);
        SortedSet<String> servers = new TreeSet<>();
        servers.add("localhost:"+changelogPorts[0]);
        ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(
            changelogPorts[1], null, 0, changelogIds[1], 0, 100, null);
        changelogs[1] = new ReplicationServer(conf);
        changelogs[1] = newReplicationServer(changelogPorts, changelogIds, 1);
        // Connect broker 2 to changelog2
        broker2 = openReplicationSession(TEST_ROOT_DN,
@@ -832,6 +773,39 @@
    }
  }
  private ReplicationServer newReplicationServer(int[] changelogPorts, int[] changelogIds, int i)
      throws ConfigException
  {
    SortedSet<String> servers = newTreeSet("localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]));
    ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(
        changelogPorts[i], "replicationServerTestChangelogChainingDb"+i,
        0, changelogIds[i], 0, 100, servers);
    return new ReplicationServer(conf);
  }
  private ModifyDNMsg modDNMsg(CSNGenerator csnGen) throws DirectoryException
  {
    ModifyDNOperationBasis op = new ModifyDNOperationBasis(
        connection, 1, 1, null, EXAMPLE_DN, RDN.decode("o=example2"), true, null);
    op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csnGen.newCSN(), "uniqueid", "newparentId"));
    LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op);
    return new ModifyDNMsg(localOp);
  }
  private AddMsg addMsg(CSNGenerator csnGen) throws Exception
  {
    String user1entryUUID = "33333333-3333-3333-3333-333333333333";
    String baseUUID  = "22222222-2222-2222-2222-222222222222";
    Entry entry = TestCaseUtils.entryFromLdifString(
        "dn: o=example," + TEST_ROOT_DN_STRING + "\n"
        + "objectClass: top\n"
        + "objectClass: domain\n"
        + "entryUUID: " + user1entryUUID + "\n");
    return new AddMsg(csnGen.newCSN(), EXAMPLE_DN, user1entryUUID, baseUUID,
        entry.getObjectClassAttribute(), entry.getAttributes(), new ArrayList<Attribute>());
  }
  private List<ReplicationMsg> receiveReplicationMsgs(ReplicationBroker broker2, int nbMessagesExpected)
  {
    List<ReplicationMsg> msgs = new ArrayList<>(nbMessagesExpected);
@@ -860,10 +834,7 @@
    return msgs;
  }
  /**
   * Test that the Replication sends back correctly WindowsUpdate
   * when we send a WindowProbeMsg.
   */
  /** Test that the Replication sends back correctly WindowsUpdate when we send a WindowProbeMsg. */
  @Test(enabled = true)
  public void windowProbeTest() throws Exception
  {
@@ -967,9 +938,7 @@
    paranoiaCheck();
  }
  /**
   * After the tests stop the replicationServer.
   */
  /** After the tests stop the replicationServer. */
  private void shutdown() throws Exception
  {
    TestCaseUtils.dsconfig(
@@ -992,16 +961,13 @@
    private Exception exc;
    private String errDetails;
    /**
     * Creates a new Stress Test Reader.
     */
    /** Creates a new Stress Test Reader. */
    public BrokerReader(ReplicationBroker broker, int numMsgExpected)
    {
      this.broker = broker;
      this.numMsgExpected = numMsgExpected;
    }
    /** {@inheritDoc} */
    @Override
    public void run()
    {
@@ -1065,16 +1031,12 @@
      this.gen = gen;
    }
    /** {@inheritDoc} */
    @Override
    public void run()
    {
      debugInfo("writer " + broker.getServerId() + " starts to produce " + count);
      int ccount = count;
      /*
       * Simple loop creating changes and sending them
       * to the replicationServer.
       */
      /* Simple loop creating changes and sending them to the replicationServer. */
      while (count>0)
      {
        count--;
@@ -1138,9 +1100,8 @@
         {
           servers.add("localhost:" + changelogPorts[1]);
         }
         ReplServerFakeConfiguration conf =
          new ReplServerFakeConfiguration(changelogPorts[i], "replicationServerTestReplicationServerConnectedDb" + i,
                                          0, changelogIds[i], 0, 100, servers);
         ReplServerFakeConfiguration conf = newReplServerConfiguration(changelogPorts, changelogIds, servers, i,
             "replicationServerTestReplicationServerConnectedDb"+i);
         changelogs[i] = new ReplicationServer(conf);
       }
@@ -1153,24 +1114,13 @@
         // - Test messages between clients by publishing now
         CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime());
         String user1entryUUID = "33333333-3333-3333-3333-333333333333";
         String baseUUID  = "22222222-2222-2222-2222-222222222222";
         // - Add
         String lentry = "dn: o=example," + TEST_ROOT_DN_STRING + "\n"
             + "objectClass: top\n" + "objectClass: domain\n"
             + "entryUUID: " + user1entryUUID + "\n";
         Entry entry = TestCaseUtils.entryFromLdifString(lentry);
         AddMsg addMsg = new AddMsg(csnGen.newCSN(), EXAMPLE_DN,
             user1entryUUID, baseUUID, entry.getObjectClassAttribute(),
             entry.getAttributes(), new ArrayList<Attribute>());
         AddMsg addMsg = addMsg(csnGen);
         broker1.publish(addMsg);
         // - Modify
         Attribute attr1 = Attributes.create("description", "new value");
      List<Modification> mods =
          Arrays.asList(new Modification(ModificationType.REPLACE, attr1));
         ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
         ModifyMsg modMsg = modMsg(csnGen);
         broker1.publish(modMsg);
         // - Check msg received by broker, through changeLog2
@@ -1183,15 +1133,15 @@
         SortedSet<String> servers = new TreeSet<>();
         // Configure replicationServer[0] to be disconnected from ReplicationServer[1]
         ReplServerFakeConfiguration conf =
          new ReplServerFakeConfiguration(changelogPorts[0], "changelogDb0", 0, changelogIds[0], 0, 100, servers);
         changelogs[0].applyConfigurationChange(conf) ;
             newReplServerConfiguration(changelogPorts, changelogIds, servers, 0, "changelogDb0");
         changelogs[0].applyConfigurationChange(conf);
         // The link between RS[0] & RS[1] should be destroyed by the new configuration.
         // So we expect a timeout exception when calling receive on RS[1].
         // Send an update and check that RS[1] does not receive the message after the timeout
      // - Del
      DeleteMsg delMsg = new DeleteMsg(EXAMPLE_DN, csnGen.newCSN(), user1entryUUID);
      DeleteMsg delMsg = new DeleteMsg(EXAMPLE_DN, csnGen.newCSN(), addMsg.getEntryUUID());
      broker1.publish(delMsg);
      // Should receive some TopologyMsg messages for disconnection between the 2 RSs
      assertOnlyTopologyMsgsReceived(broker2);
@@ -1203,6 +1153,19 @@
    }
  }
  private ReplServerFakeConfiguration newReplServerConfiguration(int[] changelogPorts, int[] changelogIds,
      SortedSet<String> servers, int i, String dirName)
  {
    return new ReplServerFakeConfiguration(changelogPorts[i], dirName, 0, changelogIds[i], 0, 100, servers);
  }
  private ModifyMsg modMsg(CSNGenerator csnGen)
  {
    List<Modification> mods = Arrays.asList(new Modification(REPLACE, Attributes.create("description", "new value")));
    return new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
  }
  private void assertOnlyTopologyMsgsReceived(ReplicationBroker broker2)
  {
    try
@@ -1227,5 +1190,4 @@
      debugInfo("Ending replicationServerConnected");
    }
  }
}