mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
25.36.2013 07c84bf1f348f59b8a460d37216381e554674c02
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java
@@ -23,24 +23,18 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2012 ForgeRock AS
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.plugin;
import java.io.File;
import org.opends.server.util.StaticUtils;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Iterator;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.testng.Assert.*;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import org.opends.server.types.DirectoryException;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.File;
import java.net.ServerSocket;
import java.util.*;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -54,9 +48,9 @@
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
import static org.opends.server.TestCaseUtils.*;
/**
 * Test in real situations the algorithm for load balancing the DSs connections
@@ -76,7 +70,6 @@
  private static final int RS1_ID = 501;
  private static final int RS2_ID = 502;
  private static final int RS3_ID = 503;
  private static final int RS4_ID = 504;
  // The tracer object for the debug logger
  private static final DebugTracer TRACER = getTracer();
@@ -90,42 +83,30 @@
    }
  }
  private void initTest()
  private void initTest() throws Exception
  {
    for (int i = 0 ; i < NDS; i++)
    {
      rd[i] = null;
    }
    for (int i = 0 ; i < NRS; i++)
    {
      rs[i] = null;
      rsPort[i] = -1;
    }
    Arrays.fill(rd, null);
    Arrays.fill(rs, null);
    Arrays.fill(rsPort, -1);
    findFreePorts();
  }
  /**
   * Find needed free TCP ports.
   */
  private void findFreePorts()
  private void findFreePorts() throws Exception
  {
    try
    {
      ServerSocket[] ss = new ServerSocket[NRS];
    ServerSocket[] ss = new ServerSocket[NRS];
      for (int i = 0; i < NRS; i++)
      {
        ss[i] = TestCaseUtils.bindFreePort();
        rsPort[i] = ss[i].getLocalPort();
      }
      for (int i = 0; i < NRS; i++)
      {
        ss[i].close();
      }
    } catch (IOException e)
    for (int i = 0; i < NRS; i++)
    {
      fail("Unable to determinate some free ports " +
        stackTraceToSingleLineString(e));
      ss[i] = TestCaseUtils.bindFreePort();
      rsPort[i] = ss[i].getLocalPort();
    }
    for (int i = 0; i < NRS; i++)
    {
      ss[i].close();
    }
  }
@@ -173,112 +154,54 @@
  {
    SortedSet<String> replServers = new TreeSet<String>();
    if (testCase.equals("testFailoversAndWeightChanges"))
    final int nbRSs = getNbRSs(testCase);
    for (int i = 0; i < nbRSs; i++)
    {
      // 4 servers used for this test case.
      for (int i = 0; i < NRS; i++)
      {
        replServers.add("localhost:" + rsPort[i]);
      }
    } else if (testCase.equals("testSpreadLoad"))
      replServers.add("localhost:" + rsPort[i]);
    }
    return replServers;
  }
  private int getNbRSs(String testCase)
  {
    if (testCase.equals("testFailoversAndWeightChanges")
        || testCase.equals("testSpreadLoad"))
    {
      // 4 servers used for this test case.
      for (int i = 0; i < NRS; i++)
      {
        replServers.add("localhost:" + rsPort[i]);
      }
      return NRS;
    } else if (testCase.equals("testNoYoyo1"))
    {
      // 2 servers used for this test case.
      for (int i = 0; i < 2; i++)
      {
        replServers.add("localhost:" + rsPort[i]);
      }
    } else if (testCase.equals("testNoYoyo2"))
      return 2;
    }
    else if (testCase.equals("testNoYoyo2") || testCase.equals("testNoYoyo3"))
    {
      // 3 servers used for this test case.
      for (int i = 0; i < 3; i++)
      {
        replServers.add("localhost:" + rsPort[i]);
      }
    } else if (testCase.equals("testNoYoyo3"))
    {
      // 3 servers used for this test case.
      for (int i = 0; i < 3; i++)
      {
        replServers.add("localhost:" + rsPort[i]);
      }
    } else
      fail("Unknown test case: " + testCase);
    return replServers;
      return 3;
    }
    fail("Unknown test case: " + testCase);
    return 0; // dead code, but java does not know about it
  }
  /**
   * Creates a new ReplicationServer.
   */
  private ReplicationServer createReplicationServer(int rsIndex,
    int weight, String testCase)
    int weight, String testCase) throws Exception
  {
    SortedSet<String> replServers = new TreeSet<String>();
    try
    {
      if (testCase.equals("testFailoversAndWeightChanges"))
      {
        // 4 servers used for this test case.
        for (int i = 0; i < NRS; i++)
        {
          if (i != rsIndex)
            replServers.add("localhost:" + rsPort[i]);
        }
      } else if (testCase.equals("testSpreadLoad"))
      {
        // 4 servers used for this test case.
        for (int i = 0; i < NRS; i++)
        {
          if (i != rsIndex)
            replServers.add("localhost:" + rsPort[i]);
        }
      } else if (testCase.equals("testNoYoyo1"))
      {
        // 2 servers used for this test case.
        for (int i = 0; i < 2; i++)
        {
          if (i != rsIndex)
            replServers.add("localhost:" + rsPort[i]);
        }
      } else if (testCase.equals("testNoYoyo2"))
      {
        // 3 servers used for this test case.
        for (int i = 0; i < 3; i++)
        {
          if (i != rsIndex)
            replServers.add("localhost:" + rsPort[i]);
        }
      } else if (testCase.equals("testNoYoyo3"))
      {
        // 3 servers used for this test case.
        for (int i = 0; i < 3; i++)
        {
          if (i != rsIndex)
            replServers.add("localhost:" + rsPort[i]);
        }
      } else
        fail("Unknown test case: " + testCase);
      String dir = "replicationServerLoadBalancingTest" + rsIndex + testCase + "Db";
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(rsPort[rsIndex], dir, 0, rsIndex+501, 0, 100,
        replServers, 1, 1000, 5000, weight);
      ReplicationServer replicationServer = new ReplicationServer(conf);
      return replicationServer;
    } catch (Exception e)
    int nbRSs = getNbRSs(testCase);
    for (int i = 0; i < nbRSs; i++)
    {
      fail("createReplicationServer " + stackTraceToSingleLineString(e));
      if (i != rsIndex){
        replServers.add("localhost:" + rsPort[i]);
      }
    }
    return null;
    String dir = "replicationServerLoadBalancingTest" + rsIndex + testCase + "Db";
    ReplServerFakeConfiguration conf =
      new ReplServerFakeConfiguration(rsPort[rsIndex], dir, 0, rsIndex+501, 0, 100,
      replServers, 1, 1000, 5000, weight);
    ReplicationServer replicationServer = new ReplicationServer(conf);
    return replicationServer;
  }
  /**
@@ -288,64 +211,40 @@
    (int rsIndex, int weight, String testCase)
  {
    SortedSet<String> replServers = new TreeSet<String>();
    try
    {
      if (testCase.equals("testFailoversAndWeightChanges"))
      {
        // 4 servers used for this test case.
        for (int i = 0; i < NRS; i++)
        {
          if (i != rsIndex)
            replServers.add("localhost:" + rsPort[i]);
        }
      } else if (testCase.equals("testSpreadLoad"))
      {
        // 4 servers used for this test case.
        for (int i = 0; i < NRS; i++)
        {
          if (i != rsIndex)
            replServers.add("localhost:" + rsPort[i]);
        }
      } else
        fail("Unknown test case: " + testCase);
      String dir = "replicationServerLoadBalancingTest" + rsIndex + testCase + "Db";
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(rsPort[rsIndex], dir, 0, rsIndex+501, 0, 100,
        replServers, 1, 1000, 5000, weight);
      return conf;
    } catch (Exception e)
    if (testCase.equals("testFailoversAndWeightChanges")
        || testCase.equals("testSpreadLoad"))
    {
      fail("createReplicationServerConfigWithNewWeight " + stackTraceToSingleLineString(e));
    }
    return null;
      // 4 servers used for this test case.
      for (int i = 0; i < NRS; i++)
      {
        if (i != rsIndex)
          replServers.add("localhost:" + rsPort[i]);
      }
    } else
      fail("Unknown test case: " + testCase);
    String dir = "replicationServerLoadBalancingTest" + rsIndex + testCase + "Db";
    ReplServerFakeConfiguration conf =
      new ReplServerFakeConfiguration(rsPort[rsIndex], dir, 0, rsIndex+501, 0, 100,
      replServers, 1, 1000, 5000, weight);
    return conf;
  }
  /**
   * Creates a new ReplicationDomain.
   */
  private LDAPReplicationDomain createReplicationDomain(int serverId,
    String testCase)
      String testCase) throws Exception
  {
    SortedSet<String> replServers = null;
    try
    {
      replServers = createRSListForTestCase(testCase);
      DN baseDn = DN.decode(TEST_ROOT_DN_STRING);
      DomainFakeCfg domainConf =
        new DomainFakeCfg(baseDn, serverId+1, replServers, 1);
      LDAPReplicationDomain replicationDomain =
    SortedSet<String> replServers = createRSListForTestCase(testCase);
    DN baseDn = DN.decode(TEST_ROOT_DN_STRING);
    DomainFakeCfg domainConf =
        new DomainFakeCfg(baseDn, serverId + 1, replServers, 1);
    LDAPReplicationDomain replicationDomain =
        MultimasterReplication.createNewDomain(domainConf);
      replicationDomain.start();
      return replicationDomain;
    } catch (Exception e)
    {
      fail("createReplicationDomain " + stackTraceToSingleLineString(e));
    }
    return null;
    replicationDomain.start();
    return replicationDomain;
  }
  /**
@@ -368,14 +267,11 @@
      /**
       * Start RS1 weigth=1, RS2 weigth=2, RS3 weigth=3, RS4 weigth=4
       */
      // Create and start RS1
      rs[0] = createReplicationServer(0, 1, testCase);
      // Create and start RS2
      rs[1] = createReplicationServer(1, 2, testCase);
      // Create and start RS3
      rs[2] = createReplicationServer(2, 3, testCase);
      // Create and start RS4
      rs[3] = createReplicationServer(3, 4, testCase);
      // Create and start RSs
      for (int i = 0; i < rs.length; i++)
      {
        rs[i] = createReplicationServer(i, i + 1, testCase);
      }
      // Start a first DS to make every RSs inter connect
      rd[0] = createReplicationDomain(0, testCase);
@@ -435,6 +331,7 @@
   *        timeout
   */
  private void checkRSConnectionsAndGenId(int[] rsIndexes, String msg)
      throws Exception
  {
    debugInfo("checkRSConnectionsAndGenId for <" + msg + ">");
    // Number of seconds to wait for condition before failing
@@ -450,7 +347,6 @@
      // Test connection
      boolean connected = false;
      boolean sameGenId = false;
      Iterator<ReplicationServerDomain> rsdIt = null;
      // Connected together ?
      int nOk = 0;
@@ -458,18 +354,15 @@
      {
        int rsIndex = rsIndexes[i];
        ReplicationServer repServer = rs[rsIndex];
        rsdIt = repServer.getDomainIterator();
        Iterator<ReplicationServerDomain> rsdIt = repServer.getDomainIterator();
        int curRsId = repServer.getServerId();
        Set<Integer> connectedRSsId = null;
        if (rsdIt != null)
        {
          connectedRSsId = rsdIt.next().getConnectedRSs().keySet();
        } else
        if (rsdIt == null)
        {
          // No domain yet, RS is not yet connected to others
          debugInfo("RS " + curRsId + " has no domain yet");
          break;
        }
        Set<Integer> connectedRSsId = rsdIt.next().getConnectedRSs().keySet();
        // Does this RS see all other RSs
        int nPeer = 0;
        debugInfo("Checking RSs connected to RS " + curRsId);
@@ -506,36 +399,31 @@
      long refGenId = -1L;
      boolean refGenIdInitialized = false;
      nOk = 0;
      rsdIt = null;
      for (int i = 0; i < nRSs; i++)
      {
        ReplicationServer repServer = rs[i];
        rsdIt = repServer.getDomainIterator();
        Iterator<ReplicationServerDomain> rsdIt = repServer.getDomainIterator();
        int curRsId = repServer.getServerId();
        Long rsGenId = -1L;
        if (rsdIt != null)
        {
          rsGenId = rsdIt.next().getGenerationId();
        } else
        if (rsdIt == null)
        {
          // No domain yet, RS is not yet connected to others
          debugInfo("RS " + curRsId + " has no domain yet");
          break;
        }
        Long rsGenId = rsdIt.next().getGenerationId();
        // Expecting all RSs to have gen id equal and not -1
        if ((rsGenId == -1L))
        if (rsGenId == -1L)
        {
          debugInfo("\tRS " + curRsId + " gen id is -1 which is not expected");
          break;
        } else
        }
        if (!refGenIdInitialized)
        {
          if (!refGenIdInitialized)
          {
            // Store reference gen id all RSs must have
            refGenId = rsGenId;
            refGenIdInitialized = true;
          }
          // Store reference gen id all RSs must have
          refGenId = rsGenId;
          refGenIdInitialized = true;
        }
        if (rsGenId == refGenId)
        {
@@ -566,21 +454,14 @@
      }
      // Sleep 1 second
      try
      {
        Thread.sleep(1000);
      } catch (InterruptedException ex)
      {
        fail("Error sleeping " + stackTraceToSingleLineString(ex));
      }
      Thread.sleep(1000);
      nSec++;
      if (nSec > secTimeout)
      {
        // Timeout reached, end with error
        fail("checkRSConnections: could not obtain that RSs are connected and have the same gen id after "
          + (nSec-1) + " seconds. [" + msg + "]");
      }
      // Timeout reached, end with error
      assertTrue(
          nSec <= secTimeout,
          "checkRSConnections: could not obtain that RSs are connected and have the same gen id after "
              + (nSec - 1) + " seconds. [" + msg + "]");
    }
  }
@@ -599,17 +480,14 @@
    try
    {
      /**
       * RS1 (weight=1) starts
       */
      rs[0] = createReplicationServer(0, 1, testCase);
      /**
       * DS1 starts and connects to RS1
       */
      rd[0] = createReplicationDomain(0, testCase);
      assertTrue(rd[0].isConnected());
      assertEquals(rd[0].getRsServerId(), RS1_ID);
@@ -617,7 +495,6 @@
      /**
       * RS2 (weight=1) starts
       */
      rs[1] = createReplicationServer(1, 1, testCase);
      checkRSConnectionsAndGenId(new int[] {0, 1},
        "Waiting for RS2 connected to peers");
@@ -625,7 +502,6 @@
      /**
       * DS2 starts and connects to RS2
       */
      rd[1] = createReplicationDomain(1, testCase);
      assertTrue(rd[1].isConnected());
      assertEquals(rd[1].getRsServerId(), RS2_ID);
@@ -633,7 +509,6 @@
      /**
       * RS3 (weight=1) starts
       */
      rs[2] = createReplicationServer(2, 1, testCase);
      checkRSConnectionsAndGenId(new int[] {0, 1, 2},
        "Waiting for RS3 connected to peers");
@@ -641,7 +516,6 @@
      /**
       * DS3 starts and connects to RS3
       */
      rd[2] = createReplicationDomain(2, testCase);
      assertTrue(rd[2].isConnected());
      assertEquals(rd[2].getRsServerId(), RS3_ID);
@@ -649,7 +523,6 @@
      /**
       * DS4 starts and connects to RS1, RS2 or RS3
       */
      rd[3] = createReplicationDomain(3, testCase);
      assertTrue(rd[3].isConnected());
      int ds4ConnectedRsId = rd[3].getRsServerId();
@@ -661,7 +534,6 @@
      /**
       * DS5 starts and connects to one of the 2 other RSs
       */
      rd[4] = createReplicationDomain(4, testCase);
      assertTrue(rd[4].isConnected());
        int ds5ConnectedRsId = rd[4].getRsServerId();
@@ -672,7 +544,6 @@
      /**
       * DS6 starts and connects to the RS with one DS
       */
      rd[5] = createReplicationDomain(5, testCase);
      assertTrue(rd[5].isConnected());
        int ds6ConnectedRsId = rd[5].getRsServerId();
@@ -974,7 +845,7 @@
   *        timeout
   */
  private void checkForCorrectNumbersOfConnectedDSs(int[][] possibleExpectedDSsNumbers,
    String msg)
    String msg) throws Exception
  {
    // Time to wait before condition met: warning, this should let enough
    // time to the topology to auto-balance. Currently  this must at least let
@@ -988,10 +859,8 @@
    // Go out of the loop only if connection is verified or if timeout occurs
    while (true)
    {
      for (int i = 0; i < possibleExpectedDSsNumbers.length; i++)
      for (int[] expectedDSsNumbers : possibleExpectedDSsNumbers)
      {
        // Examine next possible final situation
        int[] expectedDSsNumbers = possibleExpectedDSsNumbers[i];
        // Examine connections
        int nOk = 0; // Number of RSs ok
        int nRSs = 0; // Number of RSs to examine
@@ -1029,23 +898,15 @@
      }
      // Sleep 1 second
      try
      {
        Thread.sleep(1000);
      } catch (InterruptedException ex)
      {
        fail("Error sleeping " + stackTraceToSingleLineString(ex));
      }
      Thread.sleep(1000);
      nSec++;
      if (nSec > secTimeout)
      {
        // Timeout reached, end with error
        fail("checkForCorrectNumbersOfConnectedDSs: could not get expected " +
          "connections " + intArrayToString(possibleExpectedDSsNumbers) + " after " + (nSec-1) +
          " seconds. Got this result : " + intArrayToString(finalDSsNumbers) +
          " [" + msg + "]");
      }
      // Timeout reached, end with error
      assertTrue(nSec <= secTimeout,
          "checkForCorrectNumbersOfConnectedDSs: could not get expected "
              + "connections " + intArrayToString(possibleExpectedDSsNumbers)
              + " after " + (nSec - 1) + " seconds. Got this result : "
              + intArrayToString(finalDSsNumbers) + " [" + msg + "]");
    }
  }