/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2008-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.service; import static org.testng.Assert.*; import java.util.List; import java.util.TreeSet; import java.util.concurrent.LinkedBlockingQueue; import java.net.ServerSocket; import java.util.ArrayList; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.opends.server.TestCaseUtils; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.DSInfo; import org.opends.server.replication.common.RSInfo; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.testng.annotations.Test; /** * Test the Generic Replication Service. */ public class ReplicationDomainTest extends ReplicationTestCase { /** * Test that a ReplicationDomain is able to publish and receive UpdateMsg. * Also test the ReplicationDomain.resetReplicationLog() method. */ @Test(enabled=true) public void publishAndReceive() throws Exception { String testService = "test"; ReplicationServer replServer1 = null; ReplicationServer replServer2 = null; int replServerID1 = 10; int replServerID2 = 20; FakeReplicationDomain domain1 = null; FakeReplicationDomain domain2 = null; try { // find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); int replServerPort1 = socket.getLocalPort(); socket.close(); socket = TestCaseUtils.bindFreePort(); int replServerPort2 = socket.getLocalPort(); socket.close(); TreeSet replserver1 = new TreeSet(); replserver1.add("localhost:" + replServerPort1); TreeSet replserver2 = new TreeSet(); replserver2.add("localhost:" + replServerPort2); ReplServerFakeConfiguration conf1 = new ReplServerFakeConfiguration( replServerPort1, "ReplicationDomainTestDb", 0, replServerID1, 0, 100, replserver2); ReplServerFakeConfiguration conf2 = new ReplServerFakeConfiguration( replServerPort2, "ReplicationDomainTestDb", 0, replServerID2, 0, 100, replserver1); replServer1 = new ReplicationServer(conf1);; replServer2 = new ReplicationServer(conf2); ArrayList servers = new ArrayList(1); servers.add("localhost:" + replServerPort1); BlockingQueue rcvQueue1 = new LinkedBlockingQueue(); domain1 = new FakeReplicationDomain( testService, (short) 1, servers, 100, 1000, rcvQueue1); BlockingQueue rcvQueue2 = new LinkedBlockingQueue(); domain2 = new FakeReplicationDomain( testService, (short) 2, servers, 100, 1000, rcvQueue2); /* * Publish a message from domain1, * Check that domain2 receives it shortly after. */ byte[] test = {1, 2, 3 ,4, 0, 1, 2, 3, 4, 5}; domain1.publish(test); UpdateMsg rcvdMsg = rcvQueue2.poll(1, TimeUnit.SECONDS); assertNotNull(rcvdMsg); assertEquals(test, rcvdMsg.getPayload()); /* * Now test the resetReplicationLog() method. */ List replServers = domain1.getRsList(); for (RSInfo replServerInfo : replServers) { // The generation Id of the remote should be 1 assertTrue(replServerInfo.getGenerationId() == 1); } for (DSInfo serverInfo : domain1.getDsList()) { assertTrue(serverInfo.getStatus() == ServerStatus.NORMAL_STATUS); } domain1.setGenerationID(2); domain1.resetReplicationLog(); replServers = domain1.getRsList(); for (RSInfo replServerInfo : replServers) { // The generation Id of the remote should now be 2 assertTrue(replServerInfo.getGenerationId() == 2); } for (DSInfo serverInfo : domain1.getDsList()) { if (serverInfo.getDsId() == 2) assertTrue(serverInfo.getStatus() == ServerStatus.BAD_GEN_ID_STATUS); else { assertTrue(serverInfo.getDsId() == 1); assertTrue(serverInfo.getStatus() == ServerStatus.NORMAL_STATUS); } } for (DSInfo serverInfo : domain2.getDsList()) { if (serverInfo.getDsId() == 2) assertTrue(serverInfo.getStatus() == ServerStatus.BAD_GEN_ID_STATUS); else { assertTrue(serverInfo.getDsId() == 1); assertTrue(serverInfo.getStatus() == ServerStatus.NORMAL_STATUS); } } } finally { if (domain1 != null) domain1.disableService(); if (domain2 != null) domain2.disableService(); if (replServer1 != null) replServer1.remove(); if (replServer2 != null) replServer2.remove(); } } /** * Test that a ReplicationDomain is able to export and import its database. */ @Test(enabled=true) public void exportAndImport() throws Exception { final int ENTRYCOUNT=5000; String testService = "test"; ReplicationServer replServer = null; int replServerID = 11; FakeReplicationDomain domain1 = null; FakeReplicationDomain domain2 = null; try { // find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); int replServerPort = socket.getLocalPort(); socket.close(); ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration( replServerPort, "ReplicationDomainTestDb", 0, replServerID, 0, 100, null); replServer = new ReplicationServer(conf); ArrayList servers = new ArrayList(1); servers.add("localhost:" + replServerPort); StringBuilder exportedDataBuilder = new StringBuilder(); for (int i =0; i servers = new TreeSet(); servers.add(HOST1 + SENDERPORT); servers.add(HOST2 + RECEIVERPORT); ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration( SENDERPORT, "ReplicationDomainTestDb", 0, replServerID, 0, 100, servers); replServer = new ReplicationServer(conf); BlockingQueue rcvQueue1 = new LinkedBlockingQueue(); domain1 = new FakeStressReplicationDomain( testService, (short) 2, servers, 100, 1000, rcvQueue1); System.out.println("waiting"); Thread.sleep(1000000000); } finally { if (domain1 != null) domain1.disableService(); if (replServer != null) replServer.remove(); } } /** * See comments in senderInitialize() above */ @Test(enabled=false) public void receiverInitialize() throws Exception { String testService = "test"; ReplicationServer replServer = null; int replServerID = 11; FakeStressReplicationDomain domain1 = null; try { TreeSet servers = new TreeSet(); servers.add(HOST1 + SENDERPORT); servers.add(HOST2 + RECEIVERPORT); ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration( RECEIVERPORT, "ReplicationDomainTestDb", 0, replServerID, 0, 100, servers); replServer = new ReplicationServer(conf); BlockingQueue rcvQueue1 = new LinkedBlockingQueue(); domain1 = new FakeStressReplicationDomain( testService, (short) 1, servers, 100, 100000, rcvQueue1); /* * Trigger a total update from domain1 to domain2. * Check that the exported data is correctly received on domain2. */ boolean alone = true; while (alone) { for (DSInfo remoteDS : domain1.getDsList()) { if (remoteDS.getDsId() != domain1.getServerId()) { alone = false; domain1.initializeFromRemote(remoteDS.getDsId() , null); break; } } if (alone) { System.out.println("trying..."); Thread.sleep(1000); } } System.out.println("waiting"); Thread.sleep(10000000); } finally { if (domain1 != null) domain1.disableService(); if (replServer != null) replServer.remove(); } } }