mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
16.41.2009 2b6d9196406b28e334b525e5e7642d71a9722a4f
opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -366,6 +366,26 @@
  }
  /**
   * Checks that the ChangeNumber given as a parameter is in this ServerState.
   *
   * @param   covered The ChangeNumber that should be checked.
   * @return  A boolean indicating if this ServerState contains the ChangeNumber
   *          given in parameter.
   */
  public boolean cover(ChangeNumber covered)
  {
    ChangeNumber change = this.list.get(covered.getServerId());
    if ((change == null) || (change.older(covered)))
    {
      return false;
    }
    else
    {
      return true;
    }
  }
  /**
   * Tests if the state is empty.
   *
   * @return True if the state is empty.
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1503,6 +1503,34 @@
  }
  /**
   * WARNING : only use this methods for tests purpose.
   *
   * Add the Replication Server given as a parameter in the list
   * of local replication servers.
   *
   * @param server The server to be added.
   */
  public static void onlyForTestsAddlocalReplicationServer(String server)
  {
    int separator = server.lastIndexOf(':');
    if (separator == -1)
      return ;
    int port = Integer.parseInt(server.substring(separator + 1));
    localPorts.add(port);
  }
  /**
   * WARNING : only use this methods for tests purpose.
   *
   * Clear the list of local Replication Servers
   *
   */
  public static void onlyForTestsClearLocalReplicationServerList()
  {
    localPorts.clear();
  }
  /**
   * This method allows to check if the Replication Server given
   * as the parameter is running in the local JVM.
   *
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -1375,13 +1375,14 @@
   * @param myState The local server state.
   * @param rsInfos The list of available replication servers and their
   *                 associated information (choice will be made among them).
   * @param serverId2 The server id for the suffix we are working for.
   * @param localServerId The server id for the suffix we are working for.
   * @param baseDn The suffix for which we are working for.
   * @param groupId The groupId we prefer being connected to if possible
   * @return The computed best replication server.
   */
  public static String computeBestReplicationServer(ServerState myState,
    Map<String, ServerInfo> rsInfos, int serverId2, String baseDn, byte groupId)
    Map<String, ServerInfo> rsInfos, int localServerId,
    String baseDn, byte groupId)
  {
    /*
     * Preference is given to servers with the requested group id:
@@ -1405,11 +1406,11 @@
    if (sameGroupIdRsInfos.size() > 0)
    {
      return searchForBestReplicationServer(myState, sameGroupIdRsInfos,
        serverId2, baseDn);
        localServerId, baseDn);
    } else
    {
      return searchForBestReplicationServer(myState, rsInfos,
        serverId2, baseDn);
        localServerId, baseDn);
    }
  }
@@ -1422,12 +1423,12 @@
   * @param myState The local server state.
   * @param rsInfos The list of available replication servers and their
   *                 associated information (choice will be made among them).
   * @param serverId2 The server id for the suffix we are working for.
   * @param localServerID The server id for the suffix we are working for.
   * @param baseDn The suffix for which we are working for.
   * @return The computed best replication server.
   */
  private static String searchForBestReplicationServer(ServerState myState,
    Map<String, ServerInfo> rsInfos, int serverId2, String baseDn)
    Map<String, ServerInfo> rsInfos, int localServerID, String baseDn)
  {
    /*
     * Find replication servers who are up to date (or more up to date than us,
@@ -1455,6 +1456,8 @@
    }
    String bestServer = null;
    boolean bestServerIsLocal = false;
    // Servers up to dates with regard to our changes
    HashMap<String, ServerState> upToDateServers =
      new HashMap<String, ServerState>();
@@ -1464,19 +1467,19 @@
    /*
     * Start loop to differentiate up to date servers from late ones.
     */
    ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId2);
    ChangeNumber myChangeNumber = myState.getMaxChangeNumber(localServerID);
    if (myChangeNumber == null)
    {
      myChangeNumber = new ChangeNumber(0, 0, serverId2);
      myChangeNumber = new ChangeNumber(0, 0, localServerID);
    }
    for (String repServer : rsInfos.keySet())
    {
      ServerState rsState = rsInfos.get(repServer).getServerState();
      ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId2);
      ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerID);
      if (rsChangeNumber == null)
      {
        rsChangeNumber = new ChangeNumber(0, 0, serverId2);
        rsChangeNumber = new ChangeNumber(0, 0, localServerID);
      }
      // Store state in right list
@@ -1491,7 +1494,6 @@
    if (upToDateServers.size() > 0)
    {
      /*
       * Some up to date servers, among them, choose either :
       * - The local one
@@ -1504,35 +1506,10 @@
       */
      Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
        upToDateServers.size(), baseDn, Integer.toString(serverId2));
        upToDateServers.size(), baseDn, Integer.toString(localServerID));
      logError(message);
      /*
       * If there are local Replication Servers, remove all the other one
       * from the list so that we are sure that we choose a local one.
       */
      boolean localRS = false;
      for (String upServer : upToDateServers.keySet())
      {
        if (ReplicationServer.isLocalReplicationServer(upServer))
        {
          localRS = true;
          break;
        }
      }
      if (localRS)
      {
        Iterator<String> it = upToDateServers.keySet().iterator();
        while (it.hasNext())
        {
          if (!ReplicationServer.isLocalReplicationServer(it.next()))
          {
            it.remove();
          }
        }
      }
      /*
       * First of all, compute the virtual server state for the whole topology,
       * which is composed of the most up to date change numbers for
       * each server id in the topology.
@@ -1570,7 +1547,7 @@
         * number loops and comes back to 0 (computation would have becomen
         * meaningless).
         */
        long shift = -1L;
        long shift = 0;
        ServerState curState = upToDateServers.get(upServer);
        Iterator<Integer> it = curState.iterator();
        while (it.hasNext())
@@ -1586,17 +1563,21 @@
          // Cannot be negative as topoState computed as being the max CN
          // for each server id in the topology
          long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
          if (tmpShift > shift)
          {
            shift = tmpShift;
          }
          shift +=tmpShift;
        }
        boolean upServerIsLocal =
          ReplicationServer.isLocalReplicationServer(upServer);
        if ((minShift < 0) // First time in loop
          || (shift < minShift))
            || ((shift < minShift) && upServerIsLocal)
            || (((bestServerIsLocal == false) && (shift < minShift)))
            || ((bestServerIsLocal == false) && (upServerIsLocal &&
                                              (shift<(minShift + 60)) ))
            || (shift+120 < minShift))
        {
          // This server is even closer to topo state
          bestServer = upServer;
          bestServerIsLocal = upServerIsLocal;
          minShift = shift;
        }
      } // For up to date servers
@@ -1616,26 +1597,32 @@
      long minShift = -1L;
      for (String lateServer : lateOnes.keySet())
      {
        /*
         * Choose the server who is the closest to us regarding our server id
         * (this is the most up to date regarding our server id).
         */
        ServerState curState = lateOnes.get(lateServer);
        ChangeNumber ourSidCn = curState.getMaxChangeNumber(serverId2);
        ChangeNumber ourSidCn = curState.getMaxChangeNumber(localServerID);
        if (ourSidCn == null)
        {
          ourSidCn = new ChangeNumber(0, 0, serverId2);
          ourSidCn = new ChangeNumber(0, 0, localServerID);
        }
        // Cannot be negative as our Cn for our server id is strictly
        // greater than those of the servers in late server list
        long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
        boolean lateServerisLocal =
          ReplicationServer.isLocalReplicationServer(lateServer);
        if ((minShift < 0) // First time in loop
          || (tmpShift < minShift))
          || ((tmpShift < minShift) && lateServerisLocal)
          || (((bestServerIsLocal == false) && (tmpShift < minShift)))
          || ((bestServerIsLocal == false) && (lateServerisLocal &&
                                            (tmpShift<(minShift + 60)) ))
          || (tmpShift+120 < minShift))
        {
          // This sever is even closer to topo state
          // This server is even closer to topo state
          bestServer = lateServer;
          bestServerIsLocal = lateServerisLocal;
          minShift = tmpShift;
        }
      } // For late servers
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
@@ -41,6 +41,8 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
/**
@@ -504,7 +506,7 @@
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(3L, 0, myId3);
    cn = new ChangeNumber(4L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
@@ -920,4 +922,126 @@
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  @DataProvider(name = "create3ServersData")
  public Object[][] create3ServersData() {
    return new Object[][] {
        // first RS is up to date, the others are late none is local
        { 4, 2, 3, false, 1, 2, 3, false, 2, 3, 4, false},
        // test that the local RS  is chosen first when all up to date
        { 4, 2, 3, true, 4, 2, 3, false, 4, 2, 3, false},
        // test that the local ServerID is more important than the others
        { 3, 0, 0, false, 1, 100, 100, false, 2, 100, 100, false},
        // test that the local RS is chosen first even if it is a bit late
        { 4, 1, 1, true, 4, 2, 3, false, 4, 2, 3, false},
        // test that the local RS is not chosen first when it is very late
        { 4, 1000, 1000, false, 4, 2, 3, true, 4, 2, 1000, true},
        // test that the local RS is not chosen first when it is missing
        // local changes
        { 4, 1, 1, false, 3, 2, 3, true, 1, 1, 1, false},
        // test that the local RS is not chosen first when it is missing
        // more local changes than another RS
        { 4, 1, 1, false, 2, 2, 3, true, 1, 1, 1, false},
        // test that the local RS is chosen first when it is missing
        // the same local changes as the other RS
        { 3, 1, 1, true, 3, 1, 1, false, 3, 1, 1, false},
        };
  }
  /**
   * Test with 3 replication servers (see data provider)
   */
  @Test(dataProvider =  "create3ServersData")
  public void test3Servers(
      long winnerT1, long winnerT2, long winnerT3, boolean winnerIsLocal,
      long looser1T1, long looser1T2, long looser1T3, boolean looser1IsLocal,
      long looser2T1, long looser2T2, long looser2T3, boolean looser2IsLocal)
      throws Exception
  {
    String testCase = "test3ServersLate";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    int myId1 = 1;
    int myId2 = 2;
    int myId3 = 3;
    // definitions for server names
    final String WINNER  = "localhost:123";
    final String LOOSER1 = "localhost:456";
    final String LOOSER2 = "localhost:789";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(4L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers info list
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(looser1T1, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(looser1T2, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(looser1T3, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    if (looser1IsLocal)
      ReplicationServer.onlyForTestsAddlocalReplicationServer(LOOSER1);
    // State for server 2
    aState = new ServerState();
    cn = new ChangeNumber(winnerT1, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(winnerT2, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(winnerT3, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    if (winnerIsLocal)
      ReplicationServer.onlyForTestsAddlocalReplicationServer(WINNER);
    // State for server 3
    aState = new ServerState();
    cn = new ChangeNumber(looser2T1, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(looser2T2, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(looser2T3, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
    if (looser2IsLocal)
      ReplicationServer.onlyForTestsAddlocalReplicationServer(LOOSER2);
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
    ReplicationServer.onlyForTestsClearLocalReplicationServerList();
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
}
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -165,6 +165,7 @@
   * Returns a bunch of single values for fractional-exclude configuration
   * attribute
   */
  @SuppressWarnings("unused")
  @DataProvider(name = "testExcludePrecommitProvider")
  private Object[][] testExcludePrecommitProvider()
  {
@@ -178,6 +179,7 @@
   * Returns a bunch of single values for fractional-exclude configuration
   * attribute
   */
  @SuppressWarnings("unused")
  @DataProvider(name = "testExcludeNightlyProvider")
  private Object[][] testExcludeNightlyProvider()
  {
@@ -305,6 +307,7 @@
   * Returns a bunch of single values for fractional-include configuration
   * attribute
   */
  @SuppressWarnings("unused")
  @DataProvider(name = "testIncludePrecommitProvider")
  private Object[][] testIncludePrecommitProvider()
  {
@@ -318,6 +321,7 @@
   * Returns a bunch of single values for fractional-include configuration
   * attribute
   */
  @SuppressWarnings("unused")
  @DataProvider(name = "testIncludeNightlyProvider")
  private Object[][] testIncludeNightlyProvider()
  {
@@ -1077,6 +1081,7 @@
   * Returns a bunch of single values for fractional configuration
   * attributes
   */
  @SuppressWarnings("unused")
  @DataProvider(name = "testInitWithFullUpdateExcludePrecommitProvider")
  private Object[][] testInitWithFullUpdateExcludePrecommitProvider()
  {
@@ -1090,6 +1095,7 @@
   * Returns a bunch of single values for fractional configuration
   * attributes
   */
  @SuppressWarnings("unused")
  @DataProvider(name = "testInitWithFullUpdateExcludeNightlyProvider")
  private Object[][] testInitWithFullUpdateExcludeNightlyProvider()
  {
@@ -1319,6 +1325,7 @@
   * Returns a bunch of single values for fractional configuration
   * attributes
   */
  @SuppressWarnings("unused")
  @DataProvider(name = "testInitWithFullUpdateIncludePrecommitProvider")
  private Object[][] testInitWithFullUpdateIncludePrecommitProvider()
  {
@@ -1332,6 +1339,7 @@
   * Returns a bunch of single values for fractional configuration
   * attributes
   */
  @SuppressWarnings("unused")
  @DataProvider(name = "testInitWithFullUpdateIncludeNightlyProvider")
  private Object[][] testInitWithFullUpdateIncludeNightlyProvider()
  {