/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.io.File; import java.net.ServerSocket; import java.net.SocketException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.TestCaseUtils; import org.opends.server.backends.task.TaskState; import org.opends.server.core.DirectoryServer; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.plugin.ReplicationBroker; import org.opends.server.replication.plugin.ReplicationDomain; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.DoneMessage; import org.opends.server.replication.protocol.EntryMessage; import org.opends.server.replication.protocol.ErrorMessage; import org.opends.server.replication.protocol.InitializeTargetMessage; import org.opends.server.replication.protocol.ReplicationMessage; import org.opends.server.replication.protocol.SocketSession; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.tasks.LdifFileWriter; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.types.ResultCode; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; /** * Tests contained here: * * - testSingleRS : test generation ID setting with different servers and one * Replication server. * * - testMultiRS : tests generation ID propagatoion with more than one * Replication server. * */ public class GenerationIdTest extends ReplicationTestCase { // The tracer object for the debug logger private static final DebugTracer TRACER = getTracer(); private static final String baseDnStr = "dc=example,dc=com"; private static final String baseSnStr = "genidcom"; private static final int WINDOW_SIZE = 10; private static final int CHANGELOG_QUEUE_SIZE = 100; private static final short server1ID = 1; private static final short server2ID = 2; private static final short server3ID = 3; private static final short changelog1ID = 11; private static final short changelog2ID = 12; private static final short changelog3ID = 13; private DN baseDn; private ReplicationBroker broker2 = null; private ReplicationBroker broker3 = null; private ReplicationServer replServer1 = null; private ReplicationServer replServer2 = null; private ReplicationServer replServer3 = null; private boolean emptyOldChanges = true; ReplicationDomain replDomain = null; private Entry taskInitRemoteS2; SocketSession ssSession = null; boolean ssShutdownRequested = false; protected String[] updatedEntries; private static int[] replServerPort = new int[20]; /** * A temporary LDIF file containing some test entries. */ private File ldifFile; /** * A temporary file to contain rejected entries. */ private File rejectFile; /** * A makeldif template used to create some test entries. */ private static String diff = ""; private static String[] template = new String[] { "define suffix=" + baseDnStr, "define maildomain=example.com", "define numusers=11", "", "branch: [suffix]", "", "branch: ou=People,[suffix]", "subordinateTemplate: person:[numusers]", "", "template: person", "rdnAttr: uid", "objectClass: top", "objectClass: person", "objectClass: organizationalPerson", "objectClass: inetOrgPerson", "givenName: ", "sn: ", "cn: {givenName} {sn}", "initials: {givenName:1}{sn:1}", "employeeNumber: ", "uid: user.{employeeNumber}", "mail: {uid}@[maildomain]", "userPassword: password", "telephoneNumber: ", "homePhone: ", "pager: ", "mobile: ", "street: Street", "l: ", "st: ", "postalCode: ", "postalAddress: {cn}${street}${l}, {st} {postalCode}", "description: This is the description for {cn} " + diff, ""}; private void debugInfo(String s) { logError(Message.raw(Category.SYNC, Severity.NOTICE, s)); if (debugEnabled()) { TRACER.debugInfo("** TEST **" + s); } } protected void debugInfo(String message, Exception e) { debugInfo(message + stackTraceToSingleLineString(e)); } /** * Set up the environment for performing the tests in this Class. * * @throws Exception * If the environment could not be set up. */ @BeforeClass public void setUp() throws Exception { //log("Starting generationIdTest setup: debugEnabled:" + debugEnabled()); // This test suite depends on having the schema available. TestCaseUtils.startServer(); baseDn = DN.decode(baseDnStr); updatedEntries = newLDIFEntries(); // Create an internal connection in order to provide operations // to DS to populate the db - connection = InternalClientConnection.getRootConnection(); // Synchro provider String synchroStringDN = "cn=Synchronization Providers,cn=config"; // Synchro multi-master synchroPluginStringDN = "cn=Multimaster Synchronization, " + synchroStringDN; // Synchro suffix synchroServerEntry = null; // Add config entries to the current DS server based on : // Add the replication plugin: synchroPluginEntry & synchroPluginStringDN // Add synchroServerEntry // Add replServerEntry configureReplication(); taskInitRemoteS2 = TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize-remote-replica", "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask", "ds-task-initialize-domain-dn: " + baseDn, "ds-task-initialize-replica-server-id: " + server2ID); // Change log String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN; String changeLogLdif = "dn: " + changeLogStringDN + "\n" + "objectClass: top\n" + "objectClass: ds-cfg-synchronization-changelog-server-config\n" + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n" + "ds-cfg-changelog-server-id: 1\n" + "ds-cfg-window-size: " + WINDOW_SIZE + "\n" + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE; replServerEntry = TestCaseUtils.entryFromLdifString(changeLogLdif); replServerEntry = null; } // Tests that entries have been written in the db private int testEntriesInDb() { debugInfo("TestEntriesInDb"); short found = 0; for (String entry : updatedEntries) { int dns = entry.indexOf("dn: "); int dne = entry.indexOf("dc=com"); String dn = entry.substring(dns+4,dne+6); debugInfo("Search Entry: " + dn); DN entryDN = null; try { entryDN = DN.decode(dn); } catch(Exception e) { debugInfo("TestEntriesInDb/" + e); } try { Entry resultEntry = getEntry(entryDN, 1000, true); if (resultEntry==null) { debugInfo("Entry not found <" + dn + ">"); } else { debugInfo("Entry found <" + dn + ">"); found++; } } catch(Exception e) { debugInfo("TestEntriesInDb/", e); } } return found; } /* * Creates entries necessary to the test. */ private String[] newLDIFEntries() { String[] entries = { "dn: " + baseDn + "\n" + "objectClass: top\n" + "objectClass: domain\n" + "entryUUID: 21111111-1111-1111-1111-111111111111\n" + "\n", "dn: ou=People," + baseDn + "\n" + "objectClass: top\n" + "objectClass: organizationalUnit\n" + "entryUUID: 21111111-1111-1111-1111-111111111112\n" + "\n", "dn: cn=Fiona Jensen,ou=people," + baseDn + "\n" + "objectclass: top\n" + "objectclass: person\n" + "objectclass: organizationalPerson\n" + "objectclass: inetOrgPerson\n" + "cn: Fiona Jensen\n" + "sn: Jensen\n" + "uid: fiona\n" + "telephonenumber: +1 408 555 1212\n" + "entryUUID: 21111111-1111-1111-1111-111111111113\n" + "\n", "dn: cn=Robert Langman,ou=people," + baseDn + "\n" + "objectclass: top\n" + "objectclass: person\n" + "objectclass: organizationalPerson\n" + "objectclass: inetOrgPerson\n" + "cn: Robert Langman\n" + "sn: Langman\n" + "uid: robert\n" + "telephonenumber: +1 408 555 1213\n" + "entryUUID: 21111111-1111-1111-1111-111111111114\n" + "\n" }; return entries; } private int receiveImport(ReplicationBroker broker, short serverID, String[] updatedEntries) { // Expect the broker to receive the entries ReplicationMessage msg; short entriesReceived = 0; while (true) { try { debugInfo("Broker " + serverID + " Wait for entry or done msg"); msg = broker.receive(); if (msg == null) break; if (msg instanceof InitializeTargetMessage) { debugInfo("Broker " + serverID + " receives InitializeTargetMessage "); entriesReceived = 0; } else if (msg instanceof EntryMessage) { EntryMessage em = (EntryMessage)msg; debugInfo("Broker " + serverID + " receives entry " + new String(em.getEntryBytes())); entriesReceived++; } else if (msg instanceof DoneMessage) { debugInfo("Broker " + serverID + " receives done "); break; } else if (msg instanceof ErrorMessage) { ErrorMessage em = (ErrorMessage)msg; debugInfo("Broker " + serverID + " receives ERROR " + em.toString()); break; } else { debugInfo("Broker " + serverID + " receives and trashes " + msg); } } catch(Exception e) { debugInfo("receiveUpdatedEntries" + stackTraceToSingleLineString(e)); } } if (updatedEntries != null) { assertTrue(entriesReceived == updatedEntries.length, " Received entries("+entriesReceived + ") == Expected entries("+updatedEntries.length+")"); } return entriesReceived; } /** * Creates a new replicationServer. * @param changelogId The serverID of the replicationServer to create. * @param all Specifies whether to coonect the created replication * server to the other replication servers in the test. * @return The new created replication server. */ private ReplicationServer createReplicationServer(short changelogId, boolean all, String suffix) { SortedSet servers = null; servers = new TreeSet(); try { if (changelogId==changelog1ID) { if (replServer1!=null) return replServer1; } else if (changelogId==changelog2ID) { if (replServer2!=null) return replServer2; } else if (changelogId==changelog3ID) { if (replServer3!=null) return replServer3; } if (all) { servers.add("localhost:" + getChangelogPort(changelog1ID)); servers.add("localhost:" + getChangelogPort(changelog2ID)); servers.add("localhost:" + getChangelogPort(changelog3ID)); } int chPort = getChangelogPort(changelogId); String chDir = "genid"+changelogId+suffix+"Db"; ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(chPort, chDir, 0, changelogId, 0, 100, servers); ReplicationServer replicationServer = new ReplicationServer(conf); Thread.sleep(1000); return replicationServer; } catch (Exception e) { fail("createChangelog" + stackTraceToSingleLineString(e)); } return null; } /** * Create a synchronized suffix in the current server providing the * replication Server ID. * @param changelogID */ private void connectServer1ToChangelog(short changelogID) { // Connect DS to the replicationServer try { // suffix synchronized String synchroServerStringDN = synchroPluginStringDN; String synchroServerLdif = "dn: cn=" + baseSnStr + ", cn=domains," + synchroServerStringDN + "\n" + "objectClass: top\n" + "objectClass: ds-cfg-replication-domain\n" + "cn: " + baseSnStr + "\n" + "ds-cfg-base-dn: " + baseDnStr + "\n" + "ds-cfg-replication-server: localhost:" + getChangelogPort(changelogID)+"\n" + "ds-cfg-server-id: " + server1ID + "\n" + "ds-cfg-receive-status: true\n" + "ds-cfg-window-size: " + WINDOW_SIZE; if (synchroServerEntry == null) { synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif); DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), "Unable to add the synchronized server"); configEntryList.add(synchroServerEntry.getDN()); replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn); } if (replDomain != null) { debugInfo("ReplicationDomain: Import/Export is running ? " + replDomain.ieRunning()); } } catch(Exception e) { debugInfo("connectToReplServer", e); fail("connectToReplServer", e); } } /* * Disconnect DS from the replicationServer */ private void disconnectFromReplServer(short changelogID) { try { // suffix synchronized String synchroServerStringDN = "cn=" + baseSnStr + ", cn=domains," + synchroPluginStringDN; if (synchroServerEntry != null) { DN synchroServerDN = DN.decode(synchroServerStringDN); DirectoryServer.getConfigHandler().deleteEntry(synchroServerDN,null); assertTrue(DirectoryServer.getConfigEntry(synchroServerEntry.getDN())==null, "Unable to delete the synchronized domain"); synchroServerEntry = null; configEntryList.remove(configEntryList.indexOf(synchroServerDN)); } } catch(Exception e) { fail("disconnectFromReplServer", e); } } private int getChangelogPort(short changelogID) { if (replServerPort[changelogID] == 0) { try { // Find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); replServerPort[changelogID] = socket.getLocalPort(); socket.close(); } catch(Exception e) { fail("Cannot retrieve a free port for replication server." + e.getMessage()); } } return replServerPort[changelogID]; } protected static final String REPLICATION_GENERATION_ID = "ds-sync-generation-id"; private long readGenId() { long genId=-1; try { Entry resultEntry = getEntry(baseDn, 1000, true); if (resultEntry==null) { debugInfo("Entry not found <" + baseDn + ">"); } else { debugInfo("Entry found <" + baseDn + ">"); AttributeType synchronizationGenIDType = DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID); List attrs = resultEntry.getAttribute(synchronizationGenIDType); if (attrs != null) { Attribute attr = attrs.get(0); LinkedHashSet values = attr.getValues(); if (values.size() == 1) { genId = Long.decode(values.iterator().next().getStringValue()); } } } } catch(Exception e) { fail("Exception raised in readGenId", e); } return genId; } private Entry getTaskImport() { Entry task = null; try { // Create a temporary test LDIF file. ldifFile = File.createTempFile("import-test", ".ldif"); String resourcePath = DirectoryServer.getServerRoot() + File.separator + "config" + File.separator + "MakeLDIF"; LdifFileWriter.makeLdif(ldifFile.getPath(), resourcePath, template); // Create a temporary rejects file. rejectFile = File.createTempFile("import-test-rejects", ".ldif"); task = TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-import", "ds-task-class-name: org.opends.server.tasks.ImportTask", "ds-task-import-backend-id: userRoot", "ds-task-import-ldif-file: " + ldifFile.getPath(), "ds-task-import-reject-file: " + rejectFile.getPath(), "ds-task-import-overwrite-rejects: TRUE", "ds-task-import-exclude-attribute: description" ); } catch(Exception e) { } return task; } private String createEntry(UUID uid) { String user2dn = "uid=user"+uid+",ou=People," + baseDnStr; return new String( "dn: "+ user2dn + "\n" + "objectClass: top\n" + "objectClass: person\n" + "objectClass: organizationalPerson\n" + "objectClass: inetOrgPerson\n" + "uid: user.1\n" + "homePhone: 951-245-7634\n" + "description: This is the description for Aaccf Amar.\n" + "st: NC\n" + "mobile: 027-085-0537\n" + "postalAddress: Aaccf Amar$17984 Thirteenth Street" + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n" + "cn: Aaccf Amar2\n" + "l: Rockford\n" + "pager: 508-763-4246\n" + "street: 17984 Thirteenth Street\n" + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 2\n" + "sn: Amar2\n" + "givenName: Aaccf2\n" + "postalCode: 85762\n" + "userPassword: password\n" + "initials: AA\n"); } static protected ReplicationMessage createAddMsg() { Entry personWithUUIDEntry = null; String user1entryUUID; String baseUUID = null; String user1dn; /* * Create a Change number generator to generate new changenumbers * when we need to send operation messages to the replicationServer. */ ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 2, 0); user1entryUUID = "33333333-3333-3333-3333-333333333333"; user1dn = "uid=user1,ou=People," + baseDnStr; String entryWithUUIDldif = "dn: "+ user1dn + "\n" + "objectClass: top\n" + "objectClass: person\n" + "objectClass: organizationalPerson\n" + "objectClass: inetOrgPerson\n" + "uid: user.1\n" + "homePhone: 951-245-7634\n" + "description: This is the description for Aaccf Amar.\n" + "st: NC\n" + "mobile: 027-085-0537\n" + "postalAddress: Aaccf Amar$17984 Thirteenth Street" + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n" + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n" + "street: 17984 Thirteenth Street\n" + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n" + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n" + "userPassword: password\n" + "initials: AA\n" + "entryUUID: " + user1entryUUID + "\n"; try { personWithUUIDEntry = TestCaseUtils.entryFromLdifString(entryWithUUIDldif); } catch(Exception e) { fail(e.getMessage()); } // Create and publish an update message to add an entry. AddMsg addMsg = new AddMsg(gen.newChangeNumber(), personWithUUIDEntry.getDN().toString(), user1entryUUID, baseUUID, personWithUUIDEntry.getObjectClassAttribute(), personWithUUIDEntry.getAttributes(), new ArrayList()); return addMsg; } /** * SingleRS tests basic features of generationID * with one single Replication Server. * * @throws Exception */ @Test(enabled=false) public void testSingleRS() throws Exception { String testCase = "testSingleRS"; debugInfo("Starting "+ testCase + " debugEnabled:" + debugEnabled()); debugInfo(testCase + " Clearing DS1 backend"); ReplicationDomain.clearJEBackend(false, "userRoot", baseDn.toNormalizedString()); try { long rgenId; long genId; replServer1 = createReplicationServer(changelog1ID, false, testCase); /* * Test : empty replicated backend * Check : nothing is broken - no generationId generated */ // Connect DS to RS with no data // Read generationId - should be not retrievable since no entry debugInfo(testCase + " Connecting DS1 to replServer1(" + changelog1ID + ")"); connectServer1ToChangelog(changelog1ID); debugInfo(testCase + " Expect genId attribute to be not retrievable"); genId = readGenId(); assertEquals(genId,-1); debugInfo(testCase + " Expect genId to be set in memory on the replication " + " server side even if not wrote on disk/db since no change occurred."); rgenId = replServer1.getGenerationId(baseDn); assertEquals(rgenId, 3211313L); debugInfo(testCase + " Disconnecting DS1 from replServer1(" + changelog1ID + ")"); disconnectFromReplServer(changelog1ID); /* * Test : non empty replicated backend * Check : generationId correctly generated */ // Now disconnect - create entries and reconnect // Test that generation has been added to the data. debugInfo(testCase + " add test entries to DS"); this.addTestEntriesToDB(updatedEntries); connectServer1ToChangelog(changelog1ID); // Test that the generationId is written in the DB in the // root entry on the replica side genId = readGenId(); assertTrue(genId != -1); assertTrue(genId != 3211313L); // Test that the generationId is set on the replication server side rgenId = replServer1.getGenerationId(baseDn); assertEquals(genId, rgenId); /* * Test : Connection from 2nd broker with a different generationId * Check: We should receive an error message */ try { broker2 = openReplicationSession(baseDn, server2ID, 100, getChangelogPort(changelog1ID), 1000, !emptyOldChanges, genId+1); } catch(SocketException se) { fail("Broker connection is expected to be accepted."); } try { ReplicationMessage msg = broker2.receive(); if (!(msg instanceof ErrorMessage)) { fail("Broker connection is expected to receive an ErrorMessage." + msg); } ErrorMessage emsg = (ErrorMessage)msg; debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails()); } catch(SocketTimeoutException se) { fail("Broker is expected to receive an ErrorMessage."); } /* * Test : Connect with same generationId * Check : Must be accepted. */ try { broker3 = openReplicationSession(baseDn, server3ID, 100, getChangelogPort(changelog1ID), 1000, !emptyOldChanges, genId); } catch(SocketException se) { fail("Broker connection is expected to be accepted."); } /* * Test : generationID persistence in Replication server * Shutdown/Restart Replication Server and redo connections * with valid and invalid generationId * Check : same expected connections results */ // The changes from broker2 should be ignored broker2.publish(createAddMsg()); try { broker3.receive(); fail("No update message is supposed to be received here."); } catch(SocketTimeoutException e) { // This is the expected result } // Now create a change that must be replicated String ent1[] = { createEntry(UUID.randomUUID()) }; this.addTestEntriesToDB(ent1); try { ReplicationMessage msg = broker3.receive(); debugInfo("Broker 3 received expected update msg" + msg); } catch(SocketTimeoutException e) { fail("Update message is supposed to be received."); } long genIdBeforeShut = replServer1.getGenerationId(baseDn); debugInfo("Shutdown replServer1"); broker2.stop(); broker2 = null; broker3.stop(); broker3 = null; replServer1.shutdown(); replServer1 = null; debugInfo("Create again replServer1"); replServer1 = createReplicationServer(changelog1ID, false, testCase); debugInfo("Delay to allow DS to reconnect to replServer1"); Thread.sleep(1000); long genIdAfterRestart = replServer1.getGenerationId(baseDn); debugInfo("Aft restart / replServer.genId=" + genIdAfterRestart); assertTrue(replServer1!=null, "Replication server creation failed."); assertTrue(genIdBeforeShut == genIdAfterRestart, "generationId is expected to have the same value" + " after replServer1 restart. Before : " + genIdBeforeShut + " after : " + genIdAfterRestart); try { debugInfo("Create again broker2"); broker2 = openReplicationSession(baseDn, server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges, genId); assertTrue(broker2.isConnected(), "Broker2 failed to connect to replication server"); debugInfo("Create again broker3"); broker3 = openReplicationSession(baseDn, server3ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges, genId); assertTrue(broker3.isConnected(), "Broker3 failed to connect to replication server"); } catch(SocketException se) { fail("Broker connection is expected to be accepted."); } /* * * FIXME Should clearJEBackend() regenerate generationId and do a start * against ReplicationServer ? */ /* * Test: Reset the replication server in order to allow new data set. */ debugInfo("Launch an on-line import on DS."); genId=-1; Entry importTask = getTaskImport(); addTask(importTask, ResultCode.SUCCESS, null); waitTaskState(importTask, TaskState.COMPLETED_SUCCESSFULLY, null); Thread.sleep(500); Entry taskReset = TestCaseUtils.makeEntry( "dn: ds-task-id=resetgenid"+genId+ UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-reset-generation-id", "ds-task-class-name: org.opends.server.tasks.SetGenerationIdTask", "ds-task-reset-generation-id-domain-base-dn: " + baseDnStr); debugInfo("Reset generationId"); addTask(taskReset, ResultCode.SUCCESS, null); waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null); Thread.sleep(200); // TODO: Test that replication server db has been cleared debugInfo("Expect new genId to be computed on DS and sent to all replServers after on-line import."); genId = readGenId(); assertTrue(genId != -1, "DS is expected to have a new genID computed " + " after on-line import but genId=" + genId); rgenId = replServer1.getGenerationId(baseDn); assertEquals(genId, rgenId, "DS and replServer are expected to have same genId."); assertTrue(!replServer1.getReplicationServerDomain(baseDn, false). isDegradedDueToGenerationId(server1ID), "Expecting that DS is not degraded since domain genId has been reset"); assertTrue(replServer1.getReplicationServerDomain(baseDn, false). isDegradedDueToGenerationId(server2ID), "Expecting that broker2 is degraded since domain genId has been reset"); assertTrue(replServer1.getReplicationServerDomain(baseDn, false). isDegradedDueToGenerationId(server3ID), "Expecting that broker3 is degraded since domain genId has been reset"); // Now create a change that normally would be replicated // but will not be replicated here since DS and brokers are degraded String[] ent3 = { createEntry(UUID.randomUUID()) }; this.addTestEntriesToDB(ent3); try { ReplicationMessage msg = broker2.receive(); if (!(msg instanceof ErrorMessage)) { fail("Broker 2 connection is expected to receive an ErrorMessage." + msg); } ErrorMessage emsg = (ErrorMessage)msg; debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails()); } catch(SocketTimeoutException se) { fail("Broker 2 is expected to receive an ErrorMessage."); } try { ReplicationMessage msg = broker3.receive(); if (!(msg instanceof ErrorMessage)) { fail("Broker 3 connection is expected to receive an ErrorMessage." + msg); } ErrorMessage emsg = (ErrorMessage)msg; debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails()); } catch(SocketTimeoutException se) { fail("Broker 3 is expected to receive an ErrorMessage."); } try { ReplicationMessage msg = broker2.receive(); fail("No update message is supposed to be received by degraded broker2" + msg); } catch(SocketTimeoutException e) { /* expected */ } try { ReplicationMessage msg = broker3.receive(); fail("No update message is supposed to be received by degraded broker3"+ msg); } catch(SocketTimeoutException e) { /* expected */ } debugInfo("broker2 is publishing a change, " + "replServer1 expected to ignore this change."); broker2.publish(createAddMsg()); try { ReplicationMessage msg = broker3.receive(); fail("No update message is supposed to be received by degraded broker3"+ msg); } catch(SocketTimeoutException e) { /* expected */ } // In S1 launch the total update to initialize S2 addTask(taskInitRemoteS2, ResultCode.SUCCESS, null); // S2 should be re-initialized and have a new valid genId int receivedEntriesNb = this.receiveImport(broker2, server2ID, null); debugInfo("broker2 has been initialized from DS with #entries=" + receivedEntriesNb); debugInfo("Adding reset task to DS."); taskReset = TestCaseUtils.makeEntry( "dn: ds-task-id=resetgenid"+ UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-reset-generation-id", "ds-task-class-name: org.opends.server.tasks.SetGenerationIdTask", "ds-task-reset-generation-id-domain-base-dn: " + baseDnStr); addTask(taskReset, ResultCode.SUCCESS, null); waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null); Thread.sleep(200); debugInfo("Verifying that replServer1 has been reset."); assertEquals(replServer1.getGenerationId(baseDn), rgenId); debugInfo("Disconnect DS from replServer1 (required in order to DEL entries)."); disconnectFromReplServer(changelog1ID); postTest(); debugInfo(testCase + " Clearing DS backend"); ReplicationDomain.clearJEBackend(false, replDomain.getBackend().getBackendID(), baseDn.toNormalizedString()); // At this moment, root entry of the domain has been removed so // genId is no more in the database ... but it has still the old // value in memory. testEntriesInDb(); replDomain.loadGenerationId(); debugInfo("Successfully ending " + testCase); } catch(Exception e) { fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } } /** /** * SingleRS tests basic features of generationID * with more than one Replication Server. * The following test focus on: * - genId checking accross multiple starting RS (replication servers) * - genId setting propagation from one RS to the others * - genId reset propagation from one RS to the others */ @Test(enabled=false) public void testMultiRS() throws Exception { String testCase = "testMultiRS"; long genId; debugInfo("Starting " + testCase); ReplicationDomain.clearJEBackend(false, "userRoot", baseDn.toNormalizedString()); debugInfo ("Creating 3 RS"); replServer1 = createReplicationServer(changelog1ID, true, testCase); replServer2 = createReplicationServer(changelog2ID, true, testCase); replServer3 = createReplicationServer(changelog3ID, true, testCase); Thread.sleep(500); debugInfo("Connecting DS to replServer1"); connectServer1ToChangelog(changelog1ID); Thread.sleep(1500); debugInfo("Expect genId are set in all replServers."); assertEquals(replServer1.getGenerationId(baseDn), 3211313L, " in replServer1"); assertEquals(replServer2.getGenerationId(baseDn), 3211313L, " in replServer2"); assertEquals(replServer3.getGenerationId(baseDn), 3211313L, " in replServer3"); debugInfo("Disconnect DS from replServer1."); disconnectFromReplServer(changelog1ID); Thread.sleep(1000); debugInfo("Add entries to DS"); this.addTestEntriesToDB(updatedEntries); debugInfo("Connecting DS to replServer2"); connectServer1ToChangelog(changelog2ID); Thread.sleep(1000); debugInfo("Expect genIds to be set in all servers based on the added entries."); genId = readGenId(); assertTrue(genId != -1); assertEquals(replServer1.getGenerationId(baseDn), genId); assertEquals(replServer2.getGenerationId(baseDn), genId); assertEquals(replServer3.getGenerationId(baseDn), genId); debugInfo("Connecting broker2 to replServer3 with a good genId"); try { broker2 = openReplicationSession(baseDn, server2ID, 100, getChangelogPort(changelog3ID), 1000, !emptyOldChanges, genId); Thread.sleep(1000); } catch(SocketException se) { fail("Broker connection is expected to be accepted."); } debugInfo("Expecting that broker2 is not degraded since it has a correct genId"); assertTrue(!replServer1.getReplicationServerDomain(baseDn, false). isDegradedDueToGenerationId(server2ID)); debugInfo("Disconnecting DS from replServer1"); disconnectFromReplServer(changelog1ID); debugInfo("Expect all genIds to keep their value since broker2 is still connected."); assertEquals(replServer1.getGenerationId(baseDn), genId); assertEquals(replServer2.getGenerationId(baseDn), genId); assertEquals(replServer3.getGenerationId(baseDn), genId); debugInfo("Connecting broker2 to replServer1 with a bad genId"); try { long badgenId=1; broker3 = openReplicationSession(baseDn, server3ID, 100, getChangelogPort(changelog1ID), 1000, !emptyOldChanges, badgenId); Thread.sleep(1000); } catch(SocketException se) { fail("Broker connection is expected to be accepted."); } debugInfo("Expecting that broker3 is degraded since it has a bad genId"); assertTrue(replServer1.getReplicationServerDomain(baseDn, false). isDegradedDueToGenerationId(server3ID)); int found = testEntriesInDb(); assertEquals(found, updatedEntries.length, " Entries present in DB :" + found + " Expected entries :" + updatedEntries.length); debugInfo("Connecting DS to replServer1."); connectServer1ToChangelog(changelog1ID); Thread.sleep(1000); debugInfo("Adding reset task to DS."); Entry taskReset = TestCaseUtils.makeEntry( "dn: ds-task-id=resetgenid"+ UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-reset-generation-id", "ds-task-class-name: org.opends.server.tasks.SetGenerationIdTask", "ds-task-reset-generation-id-domain-base-dn: " + baseDnStr); addTask(taskReset, ResultCode.SUCCESS, null); waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null); Thread.sleep(500); debugInfo("Verifying that all replservers genIds have been reset."); genId = readGenId(); assertEquals(replServer1.getGenerationId(baseDn), genId); assertEquals(replServer2.getGenerationId(baseDn), genId); assertEquals(replServer3.getGenerationId(baseDn), genId); debugInfo("Adding reset task to DS."); taskReset = TestCaseUtils.makeEntry( "dn: ds-task-id=resetgenid"+ UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-reset-generation-id", "ds-task-class-name: org.opends.server.tasks.SetGenerationIdTask", "ds-task-reset-generation-id-domain-base-dn: " + baseDnStr, "ds-task-reset-generation-id-new-value: -1"); addTask(taskReset, ResultCode.SUCCESS, null); waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null); Thread.sleep(500); debugInfo("Verifying that all replservers genIds have been reset."); genId = readGenId(); assertEquals(replServer1.getGenerationId(baseDn), -1); assertEquals(replServer2.getGenerationId(baseDn), -1); assertEquals(replServer3.getGenerationId(baseDn), -1); debugInfo("Disconnect DS from replServer1 (required in order to DEL entries)."); disconnectFromReplServer(changelog1ID); debugInfo("Cleaning entries"); postTest(); debugInfo("Successfully ending " + testCase); } /** * Disconnect broker and remove entries from the local DB * @throws Exception */ protected void postTest() { debugInfo("Post test cleaning."); // Clean brokers if (broker2 != null) broker2.stop(); broker2 = null; if (broker3 != null) broker3.stop(); broker3 = null; if (replServer1 != null) replServer1.remove(); if (replServer2 != null) replServer2.remove(); if (replServer3 != null) replServer3.remove(); replServer1 = null; replServer2 = null; replServer3 = null; super.cleanRealEntries(); try { ReplicationDomain.clearJEBackend(false, replDomain.getBackend().getBackendID(), baseDn.toNormalizedString()); // At this moment, root entry of the domain has been removed so // genId is no more in the database ... but it has still the old // value in memory. testEntriesInDb(); replDomain.loadGenerationId(); } catch (Exception e) {} } /** * Test generationID saving when the root entry does not exist * at the moement when the replication is enabled. * @throws Exception */ @Test(enabled=false, groups="slow") public void testServerStop() throws Exception { String testCase = "testServerStop"; debugInfo("Starting "+ testCase + " debugEnabled:" + debugEnabled()); debugInfo(testCase + " Clearing DS1 backend"); ReplicationDomain.clearJEBackend(false, "userRoot", baseDn.toNormalizedString()); try { long genId; replServer1 = createReplicationServer(changelog1ID, false, testCase); /* * Test : empty replicated backend * Check : nothing is broken - no generationId generated */ // Connect DS to RS with no data // Read generationId - should be not retrievable since no entry debugInfo(testCase + " Connecting DS1 to replServer1(" + changelog1ID + ")"); connectServer1ToChangelog(changelog1ID); Thread.sleep(1000); debugInfo(testCase + " Expect genId attribute to be not retrievable"); genId = readGenId(); assertEquals(genId,-1); this.addTestEntriesToDB(updatedEntries); debugInfo(testCase + " Expect genId attribute to be retrievable"); genId = readGenId(); assertEquals(genId, 3211313L); disconnectFromReplServer(changelog1ID); } finally { postTest(); debugInfo("Successfully ending " + testCase); } } /** * Loop opening sessions to the Replication Server * to check that it handle correctly deconnection and reconnection. */ @Test(enabled=false, groups="slow") public void testLoop() throws Exception { String testCase = "testLoop"; debugInfo("Starting "+ testCase + " debugEnabled:" + debugEnabled()); long rgenId; ReplicationDomain.clearJEBackend(false, "userRoot", baseDn.toNormalizedString()); replServer1 = createReplicationServer(changelog1ID, false, testCase); replServer1.clearDb(); ReplicationBroker broker = null; try { for (int i=0; i< 100; i++) { long generationId = 1000+i; broker = openReplicationSession(baseDn, server2ID, 100, getChangelogPort(changelog1ID), 1000, !emptyOldChanges, generationId); debugInfo(testCase + " Expect genId to be set in memory on the replication " + " server side even if not wrote on disk/db since no change occurred."); rgenId = replServer1.getGenerationId(baseDn); if (rgenId != generationId) { fail("replication server failed to set generation ID"); replServer1.getGenerationId(baseDn); } broker.stop(); broker = null; } } finally { if (broker != null) broker.stop(); } } /** * This is used to make sure that the 3 tests are run in the * specified order since this is necessary. */ @Test(enabled=true, groups="slow") public void generationIdTest() throws Exception { testSingleRS(); testMultiRS(); testServerStop(); testLoop(); } }