opends/src/messages/messages/replication.properties
@@ -358,4 +358,5 @@ SEVERE_ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL_151=In replication server %s, \ received a safe data assured update message with incoherent level: %s, this is \ for domain %s. Message: %s SEVERE_ERR_RESET_GENERATION_ID_FAILED_152=The generation ID could not be \ reset for domain %s opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2008 Sun Microsystems, Inc. * Copyright 2008-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.service; @@ -170,7 +170,7 @@ * The ReplicationBroker that is used by this ReplicationDomain to * connect to the ReplicationService. */ private ReplicationBroker broker; private ReplicationBroker broker = null; /** * This Map is used to store all outgoing assured messages in order @@ -991,8 +991,6 @@ { // The task that initiated the operation. Task initializeTask; // The input stream for the import ReplInputStream ldifImportInputStream = null; // The target in the case of an export short exportTarget = RoutableMsg.UNKNOWN_SERVER; // The source in the case of an import @@ -1553,7 +1551,6 @@ ieContext.setCounters( initializeMessage.getEntryCount(), initializeMessage.getEntryCount()); ieContext.ldifImportInputStream = new ReplInputStream(this); try { @@ -1682,6 +1679,52 @@ } /** * Check the value of the Replication Servers generation ID. * * @param generationID The expected value of the generation ID. * * @throws DirectoryException When the generation ID of the Replication * Servers is not the expected value. */ private void checkGenerationID(long generationID) throws DirectoryException { boolean flag = false; for (int i = 0; i< 10; i++) { for (RSInfo rsInfo : getRsList()) { if (rsInfo.getGenerationId() == generationID) { flag = true; break; } else { try { Thread.sleep(i*100); } catch (InterruptedException e) { } } } if (flag) { break; } } if (!flag) { ResultCode resultCode = ResultCode.OTHER; Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID); throw new DirectoryException( resultCode, message); } } /** * Reset the Replication Log. * Calling this method will remove all the Replication information that * was kept on all the Replication Servers currently connected in the @@ -1693,7 +1736,21 @@ */ public void resetReplicationLog() throws DirectoryException { // Reset the Generation ID to -1 to clean the ReplicationServers. resetGenerationId((long)-1); // check that at least one ReplicationServer did change its generation-id checkGenerationID(-1); // Reconnect to the Replication Server so that it adopt our // GenerationID. disableService(); enableService(); resetGenerationId(getGenerationID()); // check that at least one ReplicationServer did change its generation-id checkGenerationID(getGenerationID()); } /** @@ -1715,8 +1772,7 @@ if (!isConnected()) { ResultCode resultCode = ResultCode.OTHER; Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get( serviceID); Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID); throw new DirectoryException( resultCode, message); } @@ -2088,6 +2144,8 @@ Collection<String> replicationServers, int window, long heartbeatInterval) throws ConfigException { if (broker == null) { /* * create the broker object used to publish and receive changes */ @@ -2109,15 +2167,21 @@ DirectoryServer.registerMonitorProvider(monitor); } } /** * Starts the receiver side of the Replication Service. * <p> * After this method has been called, the Replication Service will start * calling the {@link #processUpdate(UpdateMsg)}. * <p> * This method must be called once and must be called after the * {@link #startPublishService(Collection, int, long)}. * */ public void startListenService() { // // Create the listener thread listenerThread = new ListenerThread(this); listenerThread.start(); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2008 Sun Microsystems, Inc. * Copyright 2008-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.server; @@ -739,7 +739,7 @@ /** * Connect to RS * Returns true if connection was made successfuly * Returns true if connection was made successfully */ public boolean connect() { opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2008 Sun Microsystems, Inc. * Copyright 2008-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.service; @@ -56,6 +56,8 @@ private int exportedEntryCount; private long generationID = 1; public FakeReplicationDomain( String serviceID, short serverID, @@ -114,7 +116,7 @@ @Override public long getGenerationID() { return 1; return generationID; } @Override @@ -146,4 +148,9 @@ queue.add(updateMsg); return true; } public void setGenerationID(long newGenerationID) { generationID = newGenerationID; } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2008 Sun Microsystems, Inc. * Copyright 2008-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.service; @@ -59,8 +59,10 @@ public void publishAndReceive() throws Exception { String testService = "test"; ReplicationServer replServer = null; int replServerID = 10; ReplicationServer replServer1 = null; ReplicationServer replServer2 = null; int replServerID1 = 10; int replServerID2 = 20; FakeReplicationDomain domain1 = null; FakeReplicationDomain domain2 = null; @@ -68,17 +70,33 @@ { // find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); int replServerPort = socket.getLocalPort(); int replServerPort1 = socket.getLocalPort(); socket.close(); ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration( replServerPort, "ReplicationDomainTestDb", 0, replServerID, 0, 100, null); socket = TestCaseUtils.bindFreePort(); int replServerPort2 = socket.getLocalPort(); socket.close(); replServer = new ReplicationServer(conf); TreeSet<String> replserver1 = new TreeSet<String>(); replserver1.add("localhost:" + replServerPort1); TreeSet<String> replserver2 = new TreeSet<String>(); replserver2.add("localhost:" + replServerPort2); ReplServerFakeConfiguration conf1 = new ReplServerFakeConfiguration( replServerPort1, "ReplicationDomainTestDb", 0, replServerID1, 0, 100, replserver2); ReplServerFakeConfiguration conf2 = new ReplServerFakeConfiguration( replServerPort2, "ReplicationDomainTestDb", 0, replServerID2, 0, 100, replserver1); replServer1 = new ReplicationServer(conf1);; replServer2 = new ReplicationServer(conf2); ArrayList<String> servers = new ArrayList<String>(1); servers.add("localhost:" + replServerPort); servers.add("localhost:" + replServerPort1); BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>(); domain1 = new FakeReplicationDomain( @@ -99,32 +117,27 @@ assertNotNull(rcvdMsg); assertEquals(test, rcvdMsg.getPayload()); /* * Now test the resetReplicationLog() method. */ List<RSInfo> replServers = domain1.getRsList(); // There should be one and only one server in the list. assertTrue(replServers.size() == 1); RSInfo replServerInfo = replServers.get(0); for (RSInfo replServerInfo : replServers) { // The generation Id of the remote should be 1 assertTrue(replServerInfo.getGenerationId() == 1); } domain1.setGenerationID(2); domain1.resetReplicationLog(); Thread.sleep(1000); replServers = domain1.getRsList(); // There should be one and only one server in the list. assertTrue(replServers.size() == 1); replServerInfo = replServers.get(0); // The generation Id of the remote should now be -1 assertTrue(replServerInfo.getGenerationId() == -1); for (RSInfo replServerInfo : replServers) { // The generation Id of the remote should now be 2 assertTrue(replServerInfo.getGenerationId() == 2); } } finally { @@ -134,8 +147,11 @@ if (domain2 != null) domain2.disableService(); if (replServer != null) replServer.remove(); if (replServer1 != null) replServer1.remove(); if (replServer2 != null) replServer2.remove(); } }