From a5c5efbf8ca56c059709953f7fedb647dadaed06 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 27 May 2010 15:28:09 +0000
Subject: [PATCH] Fix for issues #3395 and #3998. The changes improves the replica initialization protocol, especially flow control and handling connection outage.
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java | 288 ++++++++++++++++++++++++++++++++++++++++++++-------------
1 files changed, 220 insertions(+), 68 deletions(-)
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
index c9901b6..07dceca 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2009 Sun Microsystems, Inc.
+ * Copyright 2006-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication;
@@ -61,10 +61,13 @@
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.service.ReplicationBroker;
+import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.protocol.DoneMsg;
import org.opends.server.replication.protocol.EntryMsg;
import org.opends.server.replication.protocol.ErrorMsg;
+import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.HeartbeatThread;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -146,9 +149,11 @@
boolean emptyOldChanges = true;
LDAPReplicationDomain replDomain = null;
+ int initWindow = 100;
+
private void log(String s)
{
- logError(Message.raw(Category.SYNC, Severity.INFORMATION,
+ logError(Message.raw(Category.SYNC, Severity.NOTICE,
"InitOnLineTests/" + s));
if (debugEnabled())
{
@@ -183,7 +188,8 @@
// clear it.
LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
- updatedEntries = newLDIFEntries();
+ // For most tests, a limited number of entries is enough
+ updatedEntries = newLDIFEntries(2);
// Create an internal connection in order to provide operations
// to DS to populate the db -
@@ -283,6 +289,7 @@
private void waitTaskCompleted(Entry taskEntry, TaskState expectedState,
long expectedLeft, long expectedDone)
{
+ log("waitTaskCompleted " + taskEntry.toLDIFString());
try
{
// FIXME - Factorize with TasksTestCase
@@ -398,6 +405,22 @@
for (String ldifEntry : updatedEntries)
{
Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
+ addTestEntryToDB(entry);
+ // They will be removed at the end of the test
+ entryList.addLast(entry.getDN());
+ }
+ log("addTestEntriesToDB : " + updatedEntries.length + " successfully added to DB");
+ }
+ catch(Exception e)
+ {
+ fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
+ }
+ }
+
+ private void addTestEntryToDB(Entry entry)
+ {
+ try
+ {
AddOperationBasis addOp = new AddOperationBasis(connection,
InternalClientConnection.nextOperationID(), InternalClientConnection
.nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
@@ -411,18 +434,17 @@
// They will be removed at the end of the test
entryList.addLast(entry.getDN());
- }
}
catch(Exception e)
{
- fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
+ fail("addTestEntryToDB Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
}
}
/*
* Creates entries necessary to the test.
*/
- private String[] newLDIFEntries()
+ private String[] newLDIFEntries(int entriesCnt)
{
// It is relevant to test ReplLDIFInputStream
// and ReplLDIFOutputStream with big entries
@@ -430,46 +452,76 @@
for (int i=0; i<bigAttributeValue.length; i++)
bigAttributeValue[i] = Integer.toString(i).charAt(0);
- String[] entries =
- {
+ String[] entries = new String[entriesCnt + 2];
+ String filler = "000000000000000000000000000000000000";
+
+ entries[0] = new String(
"dn: " + EXAMPLE_DN + "\n"
+ "objectClass: top\n"
+ "objectClass: domain\n"
+ "dc: example\n"
+ "entryUUID: 21111111-1111-1111-1111-111111111111\n"
- + "\n",
+ + "\n");
+ entries[1] = new String(
"dn: ou=People," + EXAMPLE_DN + "\n"
+ "objectClass: top\n"
+ "objectClass: organizationalUnit\n"
+ "ou: People\n"
+ "entryUUID: 21111111-1111-1111-1111-111111111112\n"
- + "\n",
- "dn: cn=Fiona Jensen,ou=people," + EXAMPLE_DN + "\n"
+ + "\n");
+
+ for (int i=0; i<entriesCnt; i++)
+ {
+ String useri="0000"+i;
+ entries[i+2] = new String(
+ "dn: cn="+useri+",ou=people," + EXAMPLE_DN + "\n"
+ "objectclass: top\n"
+ "objectclass: person\n"
+ "objectclass: organizationalPerson\n"
+ "objectclass: inetOrgPerson\n"
- + "cn: Fiona Jensen\n"
- + "sn: Jensen\n"
- + "uid: fiona\n"
+ + "cn: "+useri+"_cn"+"\n"
+ + "sn: "+useri+"_sn"+"\n"
+ + "uid: "+useri+"_uid"+"\n"
+ "telephonenumber:: "+ Base64.encode(
new String(bigAttributeValue).getBytes())+"\n"
- + "entryUUID: 21111111-1111-1111-1111-111111111113\n"
- + "\n",
- "dn: cn=Robert Langman,ou=people," + EXAMPLE_DN + "\n"
+ + "entryUUID: 21111111-1111-1111-1111-"+useri+
+ filler.substring(0, 12-useri.length())+"\n"
+ + "\n");
+ };
+
+ return entries;
+ }
+
+ /*
+ * Creates entries necessary to the test.
+ */
+ private String newLDIFEntry(int entryCnt)
+ {
+ // It is relevant to test ReplLDIFInputStream
+ // and ReplLDIFOutputStream with big entries
+ char bigAttributeValue[] = new char[30240];
+ for (int i=0; i<bigAttributeValue.length; i++)
+ bigAttributeValue[i] = Integer.toString(i).charAt(0);
+
+ String filler = "000000000000000000000000000000000000";
+
+ String useri="0000"+entryCnt;
+
+ return new String(
+ "dn: cn="+useri+",ou=people," + EXAMPLE_DN + "\n"
+ "objectclass: top\n"
+ "objectclass: person\n"
+ "objectclass: organizationalPerson\n"
+ "objectclass: inetOrgPerson\n"
- + "cn: Robert Langman\n"
- + "sn: Langman\n"
- + "uid: robert\n"
- + "telephonenumber: "+ new String(bigAttributeValue)+"\n"
- + "entryUUID: 21111111-1111-1111-1111-111111111114\n"
- + "\n"
- };
+ + "cn: "+useri+"_cn"+"\n"
+ + "sn: "+useri+"_sn"+"\n"
+ + "uid: "+useri+"_uid"+"\n"
+ + "telephonenumber:: "+ Base64.encode(
+ new String(bigAttributeValue).getBytes())+"\n"
+ + "entryUUID: 21111111-1111-1111-1111-"+useri+
+ filler.substring(0, 12-useri.length())+"\n"
+ + "\n");
- return entries;
}
/**
@@ -488,15 +540,16 @@
RoutableMsg initTargetMessage =
new InitializeTargetMsg(
EXAMPLE_DN, server2ID, destinationServerID, requestorID,
- updatedEntries.length);
+ updatedEntries.length, initWindow);
broker.publish(initTargetMessage);
+ int cnt = 0;
for (String entry : updatedEntries)
{
- log("Broker will pusblish 1 entry: bytes:"+ entry.length());
+ log("Broker will publish 1 entry: bytes:"+ entry.length());
EntryMsg entryMsg = new EntryMsg(senderID, destinationServerID,
- entry.getBytes());
+ entry.getBytes(), ++cnt);
broker.publish(entryMsg);
}
@@ -559,7 +612,7 @@
}
catch (SocketTimeoutException e)
{
- log("SocketTimeoutException while waiting fro entries" +
+ log("SocketTimeoutException while waiting for entries" +
stackTraceToSingleLineString(e));
}
catch(Exception e)
@@ -571,6 +624,11 @@
assertTrue(entriesReceived == updatedEntries.length,
" Received entries("+entriesReceived +
") == Expected entries("+updatedEntries.length+")");
+
+ broker.setGenerationID(EMPTY_DN_GENID);
+ broker.reStart(true);
+ try { Thread.sleep(500); } catch(Exception e) {}
+
}
/**
@@ -643,6 +701,11 @@
*/
private void connectServer1ToChangelog(int changelogID)
{
+ connectServer1ToChangelog(changelogID, 0);
+ }
+
+ private void connectServer1ToChangelog(int changelogID, int heartbeat)
+ {
// Connect DS to the replicationServer
try
{
@@ -651,7 +714,7 @@
String synchroServerLdif =
"dn: cn=" + testName + ", cn=domains," + SYNCHRO_PLUGIN_DN + "\n"
+ "objectClass: top\n"
- + "objectC7lass: ds-cfg-synchronization-provider\n"
+ + "objectClass: ds-cfg-synchronization-provider\n"
+ "objectClass: ds-cfg-replication-domain\n"
+ "cn: " + testName + "\n"
+ "ds-cfg-base-dn: " + EXAMPLE_DN + "\n"
@@ -659,7 +722,7 @@
+ getChangelogPort(changelogID)+"\n"
+ "ds-cfg-server-id: " + server1ID + "\n"
+ "ds-cfg-receive-status: true\n"
-// + "ds-cfg-heartbeat-interval: 0 ms\n"
+ + (heartbeat>0?"ds-cfg-heartbeat-interval: "+heartbeat+" ms\n":"")
+ "ds-cfg-window-size: " + WINDOW_SIZE;
@@ -706,11 +769,19 @@
/**
* Tests the import side of the Initialize task
+ * Test steps :
+ * - create a task 'InitFromS2' in S1
+ * - make S2 export its entries
+ * - test that S1 has succesfully imported the entries and completed the task.
+ *
+ * TODO: Error case: make S2 crash/disconnect in the middle of the export
+ * and test that, on S1 side, the task ends with an error.
+ * State of the backend on S1 partially initialized: ?
*/
@Test(enabled=true, groups="slow")
public void initializeImport() throws Exception
{
- String testCase = "initializeImport";
+ String testCase = "initializeImport ";
log("Starting "+testCase);
@@ -740,8 +811,8 @@
InitializeRequestMsg initMsg = (InitializeRequestMsg)msg;
// S2 publishes entries to S1
- makeBrokerPublishEntries(server2, server2ID, initMsg.getsenderID(),
- initMsg.getsenderID());
+ makeBrokerPublishEntries(server2, server2ID, initMsg.getSenderID(),
+ initMsg.getSenderID());
// Wait for task (import) completion in S1
waitTaskCompleted(taskInitFromS2, TaskState.COMPLETED_SUCCESSFULLY,
@@ -757,12 +828,16 @@
fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
} finally
{
- afterTest();
+ afterTest(testCase);
}
}
/**
* Tests the export side of the Initialize task
+ * Test steps :
+ * - add entries in S1, make S2 publish InitRequest
+ * - test that S1 has succesfully exported the entries (by receiving them
+ * on S2 side).
*/
@Test(enabled=true, groups="slow")
public void initializeExport() throws Exception
@@ -789,20 +864,27 @@
// Thread.sleep(3000);
InitializeRequestMsg initMsg = new InitializeRequestMsg(EXAMPLE_DN,
- server2ID, server1ID);
+ server2ID, server1ID, 100);
server2.publish(initMsg);
+ // Signal RS we just entered the full update status
+ server2.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
+
receiveUpdatedEntries(server2, server2ID, updatedEntries);
log("Successfully ending " + testCase);
} finally
{
- afterTest();
+ afterTest(testCase);
}
}
/**
* Tests the import side of the InitializeTarget task
+ * Test steps :
+ * - add entries in S1 and create a task 'InitTargetS2' in S1
+ * - wait task completed
+ * - test that S2 has succesfully received the entries
*/
@Test(enabled=true, groups="slow")
public void initializeTargetExport() throws Exception
@@ -832,21 +914,33 @@
// Launch in S1 the task that will initialize S2
addTask(taskInitTargetS2, ResultCode.SUCCESS, null);
- // Wait for task completion
- waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
+ // Signal RS we just entered the full update status
+ server2.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
// Tests that entries have been received by S2
receiveUpdatedEntries(server2, server2ID, updatedEntries);
+ // Wait for task completion
+ waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
+
log("Successfully ending " + testCase);
} finally
{
- afterTest();
+ afterTest(testCase);
}
}
/**
* Tests the import side of the InitializeTarget task
+ * Test steps :
+ * - addEntries in S1, create a task 'InitAll' in S1
+ * - wait task completed on S1
+ * - test that S2 and S3 have succesfully imported the entries.
+ *
+ * TODO: Error case: make S1 crash in the middle of the export and test that
+ * the task ends with an error. State of the backend on both S2 and S3: ?
+ *
+ * TODO: Error case: make S2 crash in the middle of the import and test what??
*/
@Test(enabled=true, groups="slow")
public void initializeTargetExportAll() throws Exception
@@ -879,17 +973,22 @@
// Launch in S1 the task that will initialize S2
addTask(taskInitTargetAll, ResultCode.SUCCESS, null);
- // Wait for task completion
- waitTaskState(taskInitTargetAll, TaskState.COMPLETED_SUCCESSFULLY, null);
-
// Tests that entries have been received by S2
+
+ // Signal RS we just entered the full update status
+ server2.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
+ server3.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
+
receiveUpdatedEntries(server2, server2ID, updatedEntries);
receiveUpdatedEntries(server3, server3ID, updatedEntries);
+ // Wait for task completion
+ waitTaskState(taskInitTargetAll, TaskState.COMPLETED_SUCCESSFULLY, null);
+
log("Successfully ending " + testCase);
} finally
{
- afterTest();
+ afterTest(testCase);
}
}
@@ -921,13 +1020,20 @@
// wait until the replication domain has expected generationID
// this should indicate that the import occured correctly.
- for (int count = 0; count < 100; count++)
+ for (int count = 0; count < 120; count++)
{
- if (replDomain.getGenerationID() == 56869)
+ if (replDomain.getGenerationID() == 53235)
break;
- Thread.sleep(200);
+ log(testCase + " genId=" + replDomain.getGenerationID());
+ Thread.sleep(1000);
}
+ if (replDomain.getGenerationID() != 53235)
+ {
+ fail(testCase + " Import success waited longer than expected \n" +
+ TestCaseUtils.threadStacksToString());
+ }
+
// Test that entries have been imported in S1
testEntriesInDb();
@@ -938,7 +1044,7 @@
fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
} finally
{
- afterTest();
+ afterTest(testCase);
}
}
@@ -993,7 +1099,7 @@
fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
} finally
{
- afterTest();
+ afterTest(testCase);
}
}
@@ -1052,7 +1158,8 @@
"ds-task-initialize-domain-dn: " + baseDn,
"ds-task-initialize-replica-server-id: -3");
addTask(taskInit, ResultCode.OTHER,
- ERR_INVALID_IMPORT_SOURCE.get());
+ ERR_INVALID_IMPORT_SOURCE.get(baseDn.toNormalizedString(),
+ Integer.toString(server1ID),"-3",""));
// Scope containing a serverID absent from the domain
// createTask(taskInitTargetS2);
@@ -1064,7 +1171,7 @@
fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
} finally
{
- afterTest();
+ afterTest(testCase);
}
}
@@ -1156,23 +1263,24 @@
// TODO Test ReplicationServerDomain.getDestinationServers method.
+ log("Successfully ending " + testCase);
+
} finally
{
if (broker2 != null)
broker2.stop();
if (broker3 != null)
broker3.stop();
- afterTest();
+ afterTest(testCase);
}
}
@Test(enabled=true, groups="slow")
public void initializeTargetExportMultiSS() throws Exception
{
+ String testCase = "initializeTargetExportMultiSS";
try
{
- String testCase = "initializeTargetExportMultiSS";
-
log("Starting " + testCase);
// Create 2 changelogs
@@ -1190,6 +1298,7 @@
// connected to changelog2
if (server2 == null)
{
+ log(testCase + " Will connect server 2 to " + changelog2ID);
server2 = openReplicationSession(DN.decode(EXAMPLE_DN),
server2ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
}
@@ -1197,19 +1306,40 @@
// Thread.sleep(1000);
// Launch in S1 the task that will initialize S2
+ log(testCase + " add task " + Thread.currentThread());
addTask(taskInitTargetS2, ResultCode.SUCCESS, null);
- // Wait for task completion
- waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
+ log(testCase + " " + server2.getServerId() + " wait target " + Thread.currentThread());
+ ReplicationMsg msgrcv;
+ do
+ {
+ msgrcv = server2.receive();
+ log(testCase + " " + server2.getServerId() + " receives " + msgrcv);
+ }
+ while(!(msgrcv instanceof InitializeTargetMsg));
+ assertTrue(msgrcv instanceof InitializeTargetMsg, msgrcv.getClass().getCanonicalName());
+
+ // Signal RS we just entered the full update status
+ log(testCase + " change status");
+ server2.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
// Tests that entries have been received by S2
+ log(testCase + " receive entries");
receiveUpdatedEntries(server2, server2ID, updatedEntries);
+ // Wait for task completion
+ log(testCase + " wait task completed");
+ waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
+
log("Successfully ending " + testCase);
}
+ catch(Exception e)
+ {
+ log(testCase + e.getLocalizedMessage());
+ }
finally
{
- afterTest();
+ afterTest(testCase);
}
}
@@ -1263,17 +1393,34 @@
// S3 sends init request
log(testCase + " server 3 Will send reqinit to " + server1ID);
InitializeRequestMsg initMsg =
- new InitializeRequestMsg(EXAMPLE_DN, server3ID, server1ID);
+ new InitializeRequestMsg(EXAMPLE_DN, server3ID, server1ID, 100);
server3.publish(initMsg);
// S3 should receive target, entries & done
+ log(testCase + " Wait for InitializeTargetMsg");
+
+ ReplicationMsg msgrcv = null;
+ do
+ {
+ msgrcv = server3.receive();
+ log(testCase + " receives "+ msgrcv);
+ }
+ while (!(msgrcv instanceof InitializeTargetMsg));
+ assertTrue(msgrcv instanceof InitializeTargetMsg,msgrcv.getClass().getCanonicalName() +
+ msgrcv);
+
+ // Signal RS we just entered the full update status
+ server3.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
+
log(testCase + " Will verify server 3 has received expected entries");
receiveUpdatedEntries(server3, server3ID, updatedEntries);
+ log(testCase + " Will verify no more msgs");
while (true)
{
try
{
+ log(testCase + " Will receive");
ReplicationMsg msg = server3.receive();
fail("Receive unexpected message " + msg);
} catch (SocketTimeoutException e)
@@ -1282,11 +1429,11 @@
break;
}
}
-
log("Successfully ending " + testCase);
- } finally
+ }
+ finally
{
- afterTest();
+ afterTest(testCase);
}
}
@@ -1318,7 +1465,8 @@
addTask(taskInit, ResultCode.SUCCESS, null);
waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
- ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
+ ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
+ baseDn.toString(), "20"));
// Test 2
taskInit = TestCaseUtils.makeEntry(
@@ -1331,7 +1479,9 @@
"ds-task-initialize-domain-dn: " + baseDn,
"ds-task-initialize-replica-server-id: " + server1ID);
- addTask(taskInit, ResultCode.OTHER, ERR_INVALID_IMPORT_SOURCE.get());
+ addTask(taskInit, ResultCode.OTHER, ERR_INVALID_IMPORT_SOURCE.get(
+ baseDn.toNormalizedString(),
+ Integer.toString(server1ID),"20",""));
if (replDomain != null)
{
@@ -1342,7 +1492,7 @@
log("Successfully ending " + testCase);
} finally
{
- afterTest();
+ afterTest(testCase);
}
}
@@ -1376,7 +1526,7 @@
addTask(taskInit, ResultCode.SUCCESS, null);
waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
- ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
+ ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(baseDn.toString(), "0"));
if (replDomain != null)
{
@@ -1387,7 +1537,7 @@
log("Successfully ending " + testCase);
} finally
{
- afterTest();
+ afterTest(testCase);
}
}
@@ -1483,14 +1633,15 @@
log("Successfully ending " + testCase);
} finally
{
- afterTest();
+ afterTest(testCase);
}
}
/**
* Disconnect broker and remove entries from the local DB
+ * @param testCase The name of the test case.
*/
- protected void afterTest()
+ protected void afterTest(String testCase)
{
// Check that the domain has completed the import/export task.
@@ -1564,6 +1715,7 @@
{
replServerPort[i] = 0;
}
+ log("Successfully cleaned " + testCase);
}
/**
--
Gitblit v1.10.0