From 27024516fd64857ad7f8ae6f364b09403f6dea8d Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Tue, 06 Jan 2009 08:52:11 +0000
Subject: [PATCH]
---
opendj-sdk/opends/src/messages/messages/replication.properties | 3
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java | 11 ++
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java | 34 ++++----
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java | 72 +++++++++++-------
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 112 ++++++++++++++++++++++------
5 files changed, 160 insertions(+), 72 deletions(-)
diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index ca2ee16..fa9597f 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -358,4 +358,5 @@
SEVERE_ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL_151=In replication server %s, \
received a safe data assured update message with incoherent level: %s, this is \
for domain %s. Message: %s
-
+SEVERE_ERR_RESET_GENERATION_ID_FAILED_152=The generation ID could not be \
+reset for domain %s
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 7c4df2b..1f54b7f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2008 Sun Microsystems, Inc.
+ * Copyright 2008-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.service;
@@ -170,7 +170,7 @@
* The ReplicationBroker that is used by this ReplicationDomain to
* connect to the ReplicationService.
*/
- private ReplicationBroker broker;
+ private ReplicationBroker broker = null;
/**
* This Map is used to store all outgoing assured messages in order
@@ -991,8 +991,6 @@
{
// The task that initiated the operation.
Task initializeTask;
- // The input stream for the import
- ReplInputStream ldifImportInputStream = null;
// The target in the case of an export
short exportTarget = RoutableMsg.UNKNOWN_SERVER;
// The source in the case of an import
@@ -1553,7 +1551,6 @@
ieContext.setCounters(
initializeMessage.getEntryCount(),
initializeMessage.getEntryCount());
- ieContext.ldifImportInputStream = new ReplInputStream(this);
try
{
@@ -1682,6 +1679,52 @@
}
/**
+ * Check the value of the Replication Servers generation ID.
+ *
+ * @param generationID The expected value of the generation ID.
+ *
+ * @throws DirectoryException When the generation ID of the Replication
+ * Servers is not the expected value.
+ */
+ private void checkGenerationID(long generationID) throws DirectoryException
+ {
+ boolean flag = false;
+
+ for (int i = 0; i< 10; i++)
+ {
+ for (RSInfo rsInfo : getRsList())
+ {
+ if (rsInfo.getGenerationId() == generationID)
+ {
+ flag = true;
+ break;
+ }
+ else
+ {
+ try
+ {
+ Thread.sleep(i*100);
+ } catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ if (flag)
+ {
+ break;
+ }
+ }
+
+ if (!flag)
+ {
+ ResultCode resultCode = ResultCode.OTHER;
+ Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID);
+ throw new DirectoryException(
+ resultCode, message);
+ }
+ }
+
+ /**
* Reset the Replication Log.
* Calling this method will remove all the Replication information that
* was kept on all the Replication Servers currently connected in the
@@ -1693,7 +1736,21 @@
*/
public void resetReplicationLog() throws DirectoryException
{
+ // Reset the Generation ID to -1 to clean the ReplicationServers.
resetGenerationId((long)-1);
+
+ // check that at least one ReplicationServer did change its generation-id
+ checkGenerationID(-1);
+
+ // Reconnect to the Replication Server so that it adopt our
+ // GenerationID.
+ disableService();
+ enableService();
+
+ resetGenerationId(getGenerationID());
+
+ // check that at least one ReplicationServer did change its generation-id
+ checkGenerationID(getGenerationID());
}
/**
@@ -1715,8 +1772,7 @@
if (!isConnected())
{
ResultCode resultCode = ResultCode.OTHER;
- Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(
- serviceID);
+ Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID);
throw new DirectoryException(
resultCode, message);
}
@@ -2088,26 +2144,29 @@
Collection<String> replicationServers, int window,
long heartbeatInterval) throws ConfigException
{
- /*
- * create the broker object used to publish and receive changes
- */
- broker = new ReplicationBroker(
- this, state, serviceID,
- serverID, window,
- getGenerationID(),
- heartbeatInterval,
- new ReplSessionSecurity(),
- getGroupId());
+ if (broker == null)
+ {
+ /*
+ * create the broker object used to publish and receive changes
+ */
+ broker = new ReplicationBroker(
+ this, state, serviceID,
+ serverID, window,
+ getGenerationID(),
+ heartbeatInterval,
+ new ReplSessionSecurity(),
+ getGroupId());
- broker.start(replicationServers);
+ broker.start(replicationServers);
- /*
- * Create a replication monitor object responsible for publishing
- * monitoring information below cn=monitor.
- */
- monitor = new ReplicationMonitor(this);
+ /*
+ * Create a replication monitor object responsible for publishing
+ * monitoring information below cn=monitor.
+ */
+ monitor = new ReplicationMonitor(this);
- DirectoryServer.registerMonitorProvider(monitor);
+ DirectoryServer.registerMonitorProvider(monitor);
+ }
}
/**
@@ -2115,9 +2174,14 @@
* <p>
* After this method has been called, the Replication Service will start
* calling the {@link #processUpdate(UpdateMsg)}.
+ * <p>
+ * This method must be called once and must be called after the
+ * {@link #startPublishService(Collection, int, long)}.
+ *
*/
public void startListenService()
{
+ //
// Create the listener thread
listenerThread = new ListenerThread(this);
listenerThread.start();
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index 3d4941d..e049d81 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2008 Sun Microsystems, Inc.
+ * Copyright 2008-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
@@ -231,7 +231,7 @@
fakeRs2.shutdown();
fakeRs2 = null;
}
-
+
if (fakeRs3 != null)
{
fakeRs3.shutdown();
@@ -400,7 +400,7 @@
{
port = rs3Port;
if (testCase.equals("testSafeDataManyRealRSs"))
- {
+ {
// Every 3 RSs connected together
replServers.add("localhost:" + rs1Port);
replServers.add("localhost:" + rs2Port);
@@ -603,7 +603,7 @@
debugInfo("Fake DS " + getServerId() + " received update assured sd level is wrong: " + updateMsg);
ok = false;
}
-
+
if (ok)
debugInfo("Fake DS " + getServerId() + " received update assured parameters are ok: " + updateMsg);
else
@@ -739,7 +739,7 @@
/**
* Connect to RS
- * Returns true if connection was made successfuly
+ * Returns true if connection was made successfully
*/
public boolean connect()
{
@@ -941,7 +941,7 @@
{
return everyUpdatesAreOk;
}
-
+
public int nReceivedUpdates()
{
return nReceivedUpdates;
@@ -1166,7 +1166,7 @@
{
return new Object[][]
{
-
+
{ 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
{ 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
{ 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
@@ -1329,7 +1329,7 @@
if (objectArrayList.size() == 0)
{
- // First time we add some parameters, create first object arrays
+ // First time we add some parameters, create first object arrays
// Add each possible parameter as initial parameter lists
for (Object possibleParameter : possibleParameters)
{
@@ -1388,7 +1388,7 @@
String testCase = "testSafeDataLevelHigh";
debugInfo("Starting " + testCase);
-
+
assertTrue(sdLevel > 1);
int nWishedServers = sdLevel - 1; // Number of fake RSs we want an ack from
@@ -1449,7 +1449,7 @@
fakeRs3GenId, ((fakeRs3Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
fakeRs3Scen);
assertNotNull(fakeRs3);
-
+
// Wait for connections to be finished
// DS must see expected numbers of fake DSs and RSs
waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 3);
@@ -1477,7 +1477,7 @@
fail("No timeout is expected here");
}
long sendUpdateTime = System.currentTimeMillis() - startTime;
-
+
// Check
sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
checkTimeAndMonitoringSafeData(1, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
@@ -1551,7 +1551,7 @@
fail("No timeout is expected here");
}
sendUpdateTime = System.currentTimeMillis() - startTime;
-
+
// Check
sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
checkTimeAndMonitoringSafeData(3, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
@@ -1587,8 +1587,8 @@
{
fail("No timeout is expected here");
}
- sendUpdateTime = System.currentTimeMillis() - startTime;
-
+ sendUpdateTime = System.currentTimeMillis() - startTime;
+
// Check
sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
checkTimeAndMonitoringSafeData(4, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
@@ -1962,7 +1962,7 @@
/*
* Send an assured update using configured assured parameters
*/
-
+
long startTime = System.currentTimeMillis();
AckMsg ackMsg = null;
boolean timeout = false;
@@ -1997,7 +1997,7 @@
assertFalse(ackMsg.hasWrongStatus());
assertEquals(ackMsg.getFailedServers().size(), 0);
}
-
+
} finally
{
endTest();
@@ -2054,7 +2054,7 @@
rs3 = createReplicationServer(RS3_ID, DEFAULT_GID, SMALL_TIMEOUT,
testCase);
assertNotNull(rs3);
-
+
/*
* Start DS that will send updates
*/
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
index 76c2cae..a274a90 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2008 Sun Microsystems, Inc.
+ * Copyright 2008-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.service;
@@ -56,6 +56,8 @@
private int exportedEntryCount;
+ private long generationID = 1;
+
public FakeReplicationDomain(
String serviceID,
short serverID,
@@ -114,7 +116,7 @@
@Override
public long getGenerationID()
{
- return 1;
+ return generationID;
}
@Override
@@ -146,4 +148,9 @@
queue.add(updateMsg);
return true;
}
+
+ public void setGenerationID(long newGenerationID)
+ {
+ generationID = newGenerationID;
+ }
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
index 7261219..a3cdf79 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2008 Sun Microsystems, Inc.
+ * Copyright 2008-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.service;
@@ -59,8 +59,10 @@
public void publishAndReceive() throws Exception
{
String testService = "test";
- ReplicationServer replServer = null;
- int replServerID = 10;
+ ReplicationServer replServer1 = null;
+ ReplicationServer replServer2 = null;
+ int replServerID1 = 10;
+ int replServerID2 = 20;
FakeReplicationDomain domain1 = null;
FakeReplicationDomain domain2 = null;
@@ -68,17 +70,33 @@
{
// find a free port for the replicationServer
ServerSocket socket = TestCaseUtils.bindFreePort();
- int replServerPort = socket.getLocalPort();
+ int replServerPort1 = socket.getLocalPort();
socket.close();
- ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(
- replServerPort, "ReplicationDomainTestDb",
- 0, replServerID, 0, 100, null);
+ socket = TestCaseUtils.bindFreePort();
+ int replServerPort2 = socket.getLocalPort();
+ socket.close();
- replServer = new ReplicationServer(conf);
+ TreeSet<String> replserver1 = new TreeSet<String>();
+ replserver1.add("localhost:" + replServerPort1);
+
+ TreeSet<String> replserver2 = new TreeSet<String>();
+ replserver2.add("localhost:" + replServerPort2);
+
+ ReplServerFakeConfiguration conf1 =
+ new ReplServerFakeConfiguration(
+ replServerPort1, "ReplicationDomainTestDb",
+ 0, replServerID1, 0, 100, replserver2);
+
+ ReplServerFakeConfiguration conf2 =
+ new ReplServerFakeConfiguration(
+ replServerPort2, "ReplicationDomainTestDb",
+ 0, replServerID2, 0, 100, replserver1);
+
+ replServer1 = new ReplicationServer(conf1);;
+ replServer2 = new ReplicationServer(conf2);
ArrayList<String> servers = new ArrayList<String>(1);
- servers.add("localhost:" + replServerPort);
+ servers.add("localhost:" + replServerPort1);
BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
domain1 = new FakeReplicationDomain(
@@ -99,32 +117,27 @@
assertNotNull(rcvdMsg);
assertEquals(test, rcvdMsg.getPayload());
-
/*
* Now test the resetReplicationLog() method.
*/
List<RSInfo> replServers = domain1.getRsList();
- // There should be one and only one server in the list.
- assertTrue(replServers.size() == 1);
+ for (RSInfo replServerInfo : replServers)
+ {
+ // The generation Id of the remote should be 1
+ assertTrue(replServerInfo.getGenerationId() == 1);
+ }
- RSInfo replServerInfo = replServers.get(0);
-
- // The generation Id of the remote should be 1
- assertTrue(replServerInfo.getGenerationId() == 1);
-
+ domain1.setGenerationID(2);
domain1.resetReplicationLog();
- Thread.sleep(1000);
replServers = domain1.getRsList();
- // There should be one and only one server in the list.
- assertTrue(replServers.size() == 1);
-
- replServerInfo = replServers.get(0);
-
- // The generation Id of the remote should now be -1
- assertTrue(replServerInfo.getGenerationId() == -1);
+ for (RSInfo replServerInfo : replServers)
+ {
+ // The generation Id of the remote should now be 2
+ assertTrue(replServerInfo.getGenerationId() == 2);
+ }
}
finally
{
@@ -134,8 +147,11 @@
if (domain2 != null)
domain2.disableService();
- if (replServer != null)
- replServer.remove();
+ if (replServer1 != null)
+ replServer1.remove();
+
+ if (replServer2 != null)
+ replServer2.remove();
}
}
--
Gitblit v1.10.0