From 2b6d9196406b28e334b525e5e7642d71a9722a4f Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 16 Oct 2009 06:41:58 +0000
Subject: [PATCH] Fix for 4272 : Changes not replayed initially on adde
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 28 +++++
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 83 +++++++---------
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java | 8 +
opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java | 20 ++++
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java | 126 +++++++++++++++++++++++++
5 files changed, 216 insertions(+), 49 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
index 02aea22..61df06a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -366,6 +366,26 @@
}
/**
+ * Checks that the ChangeNumber given as a parameter is in this ServerState.
+ *
+ * @param covered The ChangeNumber that should be checked.
+ * @return A boolean indicating if this ServerState contains the ChangeNumber
+ * given in parameter.
+ */
+ public boolean cover(ChangeNumber covered)
+ {
+ ChangeNumber change = this.list.get(covered.getServerId());
+ if ((change == null) || (change.older(covered)))
+ {
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ /**
* Tests if the state is empty.
*
* @return True if the state is empty.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index c093c86..b24c5e5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1503,6 +1503,34 @@
}
/**
+ * WARNING : only use this methods for tests purpose.
+ *
+ * Add the Replication Server given as a parameter in the list
+ * of local replication servers.
+ *
+ * @param server The server to be added.
+ */
+ public static void onlyForTestsAddlocalReplicationServer(String server)
+ {
+ int separator = server.lastIndexOf(':');
+ if (separator == -1)
+ return ;
+ int port = Integer.parseInt(server.substring(separator + 1));
+ localPorts.add(port);
+ }
+
+ /**
+ * WARNING : only use this methods for tests purpose.
+ *
+ * Clear the list of local Replication Servers
+ *
+ */
+ public static void onlyForTestsClearLocalReplicationServerList()
+ {
+ localPorts.clear();
+ }
+
+ /**
* This method allows to check if the Replication Server given
* as the parameter is running in the local JVM.
*
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index e058301..983cffa 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -1375,13 +1375,14 @@
* @param myState The local server state.
* @param rsInfos The list of available replication servers and their
* associated information (choice will be made among them).
- * @param serverId2 The server id for the suffix we are working for.
+ * @param localServerId The server id for the suffix we are working for.
* @param baseDn The suffix for which we are working for.
* @param groupId The groupId we prefer being connected to if possible
* @return The computed best replication server.
*/
public static String computeBestReplicationServer(ServerState myState,
- Map<String, ServerInfo> rsInfos, int serverId2, String baseDn, byte groupId)
+ Map<String, ServerInfo> rsInfos, int localServerId,
+ String baseDn, byte groupId)
{
/*
* Preference is given to servers with the requested group id:
@@ -1405,11 +1406,11 @@
if (sameGroupIdRsInfos.size() > 0)
{
return searchForBestReplicationServer(myState, sameGroupIdRsInfos,
- serverId2, baseDn);
+ localServerId, baseDn);
} else
{
return searchForBestReplicationServer(myState, rsInfos,
- serverId2, baseDn);
+ localServerId, baseDn);
}
}
@@ -1422,12 +1423,12 @@
* @param myState The local server state.
* @param rsInfos The list of available replication servers and their
* associated information (choice will be made among them).
- * @param serverId2 The server id for the suffix we are working for.
+ * @param localServerID The server id for the suffix we are working for.
* @param baseDn The suffix for which we are working for.
* @return The computed best replication server.
*/
private static String searchForBestReplicationServer(ServerState myState,
- Map<String, ServerInfo> rsInfos, int serverId2, String baseDn)
+ Map<String, ServerInfo> rsInfos, int localServerID, String baseDn)
{
/*
* Find replication servers who are up to date (or more up to date than us,
@@ -1455,6 +1456,8 @@
}
String bestServer = null;
+ boolean bestServerIsLocal = false;
+
// Servers up to dates with regard to our changes
HashMap<String, ServerState> upToDateServers =
new HashMap<String, ServerState>();
@@ -1464,19 +1467,19 @@
/*
* Start loop to differentiate up to date servers from late ones.
*/
- ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId2);
+ ChangeNumber myChangeNumber = myState.getMaxChangeNumber(localServerID);
if (myChangeNumber == null)
{
- myChangeNumber = new ChangeNumber(0, 0, serverId2);
+ myChangeNumber = new ChangeNumber(0, 0, localServerID);
}
for (String repServer : rsInfos.keySet())
{
ServerState rsState = rsInfos.get(repServer).getServerState();
- ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId2);
+ ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerID);
if (rsChangeNumber == null)
{
- rsChangeNumber = new ChangeNumber(0, 0, serverId2);
+ rsChangeNumber = new ChangeNumber(0, 0, localServerID);
}
// Store state in right list
@@ -1491,7 +1494,6 @@
if (upToDateServers.size() > 0)
{
-
/*
* Some up to date servers, among them, choose either :
* - The local one
@@ -1504,35 +1506,10 @@
*/
Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
- upToDateServers.size(), baseDn, Integer.toString(serverId2));
+ upToDateServers.size(), baseDn, Integer.toString(localServerID));
logError(message);
/*
- * If there are local Replication Servers, remove all the other one
- * from the list so that we are sure that we choose a local one.
- */
- boolean localRS = false;
- for (String upServer : upToDateServers.keySet())
- {
- if (ReplicationServer.isLocalReplicationServer(upServer))
- {
- localRS = true;
- break;
- }
- }
- if (localRS)
- {
- Iterator<String> it = upToDateServers.keySet().iterator();
- while (it.hasNext())
- {
- if (!ReplicationServer.isLocalReplicationServer(it.next()))
- {
- it.remove();
- }
- }
- }
-
- /*
* First of all, compute the virtual server state for the whole topology,
* which is composed of the most up to date change numbers for
* each server id in the topology.
@@ -1570,7 +1547,7 @@
* number loops and comes back to 0 (computation would have becomen
* meaningless).
*/
- long shift = -1L;
+ long shift = 0;
ServerState curState = upToDateServers.get(upServer);
Iterator<Integer> it = curState.iterator();
while (it.hasNext())
@@ -1586,17 +1563,21 @@
// Cannot be negative as topoState computed as being the max CN
// for each server id in the topology
long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
- if (tmpShift > shift)
- {
- shift = tmpShift;
- }
+ shift +=tmpShift;
}
+ boolean upServerIsLocal =
+ ReplicationServer.isLocalReplicationServer(upServer);
if ((minShift < 0) // First time in loop
- || (shift < minShift))
+ || ((shift < minShift) && upServerIsLocal)
+ || (((bestServerIsLocal == false) && (shift < minShift)))
+ || ((bestServerIsLocal == false) && (upServerIsLocal &&
+ (shift<(minShift + 60)) ))
+ || (shift+120 < minShift))
{
// This server is even closer to topo state
bestServer = upServer;
+ bestServerIsLocal = upServerIsLocal;
minShift = shift;
}
} // For up to date servers
@@ -1616,26 +1597,32 @@
long minShift = -1L;
for (String lateServer : lateOnes.keySet())
{
-
/*
* Choose the server who is the closest to us regarding our server id
* (this is the most up to date regarding our server id).
*/
ServerState curState = lateOnes.get(lateServer);
- ChangeNumber ourSidCn = curState.getMaxChangeNumber(serverId2);
+ ChangeNumber ourSidCn = curState.getMaxChangeNumber(localServerID);
if (ourSidCn == null)
{
- ourSidCn = new ChangeNumber(0, 0, serverId2);
+ ourSidCn = new ChangeNumber(0, 0, localServerID);
}
// Cannot be negative as our Cn for our server id is strictly
// greater than those of the servers in late server list
long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
+ boolean lateServerisLocal =
+ ReplicationServer.isLocalReplicationServer(lateServer);
if ((minShift < 0) // First time in loop
- || (tmpShift < minShift))
+ || ((tmpShift < minShift) && lateServerisLocal)
+ || (((bestServerIsLocal == false) && (tmpShift < minShift)))
+ || ((bestServerIsLocal == false) && (lateServerisLocal &&
+ (tmpShift<(minShift + 60)) ))
+ || (tmpShift+120 < minShift))
{
- // This sever is even closer to topo state
+ // This server is even closer to topo state
bestServer = lateServer;
+ bestServerIsLocal = lateServerisLocal;
minShift = tmpShift;
}
} // For late servers
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
index 349767e..7588ade 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
@@ -41,6 +41,8 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.ReplServerStartMsg;
+import org.opends.server.replication.server.ReplicationServer;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
/**
@@ -504,7 +506,7 @@
aState.update(cn);
cn = new ChangeNumber(1L, 0, myId2);
aState.update(cn);
- cn = new ChangeNumber(3L, 0, myId3);
+ cn = new ChangeNumber(4L, 0, myId3);
aState.update(cn);
replServerStartMsg =
new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
@@ -920,4 +922,126 @@
assertEquals(bestServer, WINNER, "Wrong best replication server.");
}
+
+
+ @DataProvider(name = "create3ServersData")
+ public Object[][] create3ServersData() {
+ return new Object[][] {
+ // first RS is up to date, the others are late none is local
+ { 4, 2, 3, false, 1, 2, 3, false, 2, 3, 4, false},
+
+ // test that the local RS is chosen first when all up to date
+ { 4, 2, 3, true, 4, 2, 3, false, 4, 2, 3, false},
+
+ // test that the local ServerID is more important than the others
+ { 3, 0, 0, false, 1, 100, 100, false, 2, 100, 100, false},
+
+ // test that the local RS is chosen first even if it is a bit late
+ { 4, 1, 1, true, 4, 2, 3, false, 4, 2, 3, false},
+
+ // test that the local RS is not chosen first when it is very late
+ { 4, 1000, 1000, false, 4, 2, 3, true, 4, 2, 1000, true},
+
+ // test that the local RS is not chosen first when it is missing
+ // local changes
+ { 4, 1, 1, false, 3, 2, 3, true, 1, 1, 1, false},
+
+ // test that the local RS is not chosen first when it is missing
+ // more local changes than another RS
+ { 4, 1, 1, false, 2, 2, 3, true, 1, 1, 1, false},
+
+ // test that the local RS is chosen first when it is missing
+ // the same local changes as the other RS
+ { 3, 1, 1, true, 3, 1, 1, false, 3, 1, 1, false},
+ };
+ }
+
+ /**
+ * Test with 3 replication servers (see data provider)
+ */
+ @Test(dataProvider = "create3ServersData")
+ public void test3Servers(
+ long winnerT1, long winnerT2, long winnerT3, boolean winnerIsLocal,
+ long looser1T1, long looser1T2, long looser1T3, boolean looser1IsLocal,
+ long looser2T1, long looser2T2, long looser2T3, boolean looser2IsLocal)
+ throws Exception
+ {
+ String testCase = "test3ServersLate";
+
+ debugInfo("Starting " + testCase);
+
+ // definitions for server ids
+ int myId1 = 1;
+ int myId2 = 2;
+ int myId3 = 3;
+
+ // definitions for server names
+ final String WINNER = "localhost:123";
+ final String LOOSER1 = "localhost:456";
+ final String LOOSER2 = "localhost:789";
+
+ // Create my state
+ ServerState mySt = new ServerState();
+ ChangeNumber cn = new ChangeNumber(4L, 0, myId1);
+ mySt.update(cn);
+ cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+ mySt.update(cn);
+ cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+ mySt.update(cn);
+
+ // Create replication servers info list
+ HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+
+ // State for server 1
+ ServerState aState = new ServerState();
+ cn = new ChangeNumber(looser1T1, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(looser1T2, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(looser1T3, 0, myId3);
+ aState.update(cn);
+ ReplServerStartMsg replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
+ if (looser1IsLocal)
+ ReplicationServer.onlyForTestsAddlocalReplicationServer(LOOSER1);
+
+ // State for server 2
+ aState = new ServerState();
+ cn = new ChangeNumber(winnerT1, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(winnerT2, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(winnerT3, 0, myId3);
+ aState.update(cn);
+ replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
+ if (winnerIsLocal)
+ ReplicationServer.onlyForTestsAddlocalReplicationServer(WINNER);
+
+ // State for server 3
+ aState = new ServerState();
+ cn = new ChangeNumber(looser2T1, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(looser2T2, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(looser2T3, 0, myId3);
+ aState.update(cn);
+ replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
+ if (looser2IsLocal)
+ ReplicationServer.onlyForTestsAddlocalReplicationServer(LOOSER2);
+
+ String bestServer =
+ computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
+
+ ReplicationServer.onlyForTestsClearLocalReplicationServerList();
+
+ assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ }
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
index 9e975fd..ef39c00 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -165,6 +165,7 @@
* Returns a bunch of single values for fractional-exclude configuration
* attribute
*/
+ @SuppressWarnings("unused")
@DataProvider(name = "testExcludePrecommitProvider")
private Object[][] testExcludePrecommitProvider()
{
@@ -178,6 +179,7 @@
* Returns a bunch of single values for fractional-exclude configuration
* attribute
*/
+ @SuppressWarnings("unused")
@DataProvider(name = "testExcludeNightlyProvider")
private Object[][] testExcludeNightlyProvider()
{
@@ -305,6 +307,7 @@
* Returns a bunch of single values for fractional-include configuration
* attribute
*/
+ @SuppressWarnings("unused")
@DataProvider(name = "testIncludePrecommitProvider")
private Object[][] testIncludePrecommitProvider()
{
@@ -318,6 +321,7 @@
* Returns a bunch of single values for fractional-include configuration
* attribute
*/
+ @SuppressWarnings("unused")
@DataProvider(name = "testIncludeNightlyProvider")
private Object[][] testIncludeNightlyProvider()
{
@@ -1077,6 +1081,7 @@
* Returns a bunch of single values for fractional configuration
* attributes
*/
+ @SuppressWarnings("unused")
@DataProvider(name = "testInitWithFullUpdateExcludePrecommitProvider")
private Object[][] testInitWithFullUpdateExcludePrecommitProvider()
{
@@ -1090,6 +1095,7 @@
* Returns a bunch of single values for fractional configuration
* attributes
*/
+ @SuppressWarnings("unused")
@DataProvider(name = "testInitWithFullUpdateExcludeNightlyProvider")
private Object[][] testInitWithFullUpdateExcludeNightlyProvider()
{
@@ -1319,6 +1325,7 @@
* Returns a bunch of single values for fractional configuration
* attributes
*/
+ @SuppressWarnings("unused")
@DataProvider(name = "testInitWithFullUpdateIncludePrecommitProvider")
private Object[][] testInitWithFullUpdateIncludePrecommitProvider()
{
@@ -1332,6 +1339,7 @@
* Returns a bunch of single values for fractional configuration
* attributes
*/
+ @SuppressWarnings("unused")
@DataProvider(name = "testInitWithFullUpdateIncludeNightlyProvider")
private Object[][] testInitWithFullUpdateIncludeNightlyProvider()
{
--
Gitblit v1.10.0