improvements for Issue 3640 : Generic replication Service
With these changes the ReplicationDomain.resetReplicationLog() interface
returns after the reset is fully done.
Before the caller had to sleep() for a while to avoid restarting the service
before the reset was completed.
| | |
| | | SEVERE_ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL_151=In replication server %s, \ |
| | | received a safe data assured update message with incoherent level: %s, this is \ |
| | | for domain %s. Message: %s |
| | | |
| | | SEVERE_ERR_RESET_GENERATION_ID_FAILED_152=The generation ID could not be \ |
| | | reset for domain %s |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2008 Sun Microsystems, Inc. |
| | | * Copyright 2008-2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | |
| | | * The ReplicationBroker that is used by this ReplicationDomain to |
| | | * connect to the ReplicationService. |
| | | */ |
| | | private ReplicationBroker broker; |
| | | private ReplicationBroker broker = null; |
| | | |
| | | /** |
| | | * This Map is used to store all outgoing assured messages in order |
| | |
| | | { |
| | | // The task that initiated the operation. |
| | | Task initializeTask; |
| | | // The input stream for the import |
| | | ReplInputStream ldifImportInputStream = null; |
| | | // The target in the case of an export |
| | | short exportTarget = RoutableMsg.UNKNOWN_SERVER; |
| | | // The source in the case of an import |
| | |
| | | ieContext.setCounters( |
| | | initializeMessage.getEntryCount(), |
| | | initializeMessage.getEntryCount()); |
| | | ieContext.ldifImportInputStream = new ReplInputStream(this); |
| | | |
| | | try |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check the value of the Replication Servers generation ID. |
| | | * |
| | | * @param generationID The expected value of the generation ID. |
| | | * |
| | | * @throws DirectoryException When the generation ID of the Replication |
| | | * Servers is not the expected value. |
| | | */ |
| | | private void checkGenerationID(long generationID) throws DirectoryException |
| | | { |
| | | boolean flag = false; |
| | | |
| | | for (int i = 0; i< 10; i++) |
| | | { |
| | | for (RSInfo rsInfo : getRsList()) |
| | | { |
| | | if (rsInfo.getGenerationId() == generationID) |
| | | { |
| | | flag = true; |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | try |
| | | { |
| | | Thread.sleep(i*100); |
| | | } catch (InterruptedException e) |
| | | { |
| | | } |
| | | } |
| | | } |
| | | if (flag) |
| | | { |
| | | break; |
| | | } |
| | | } |
| | | |
| | | if (!flag) |
| | | { |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID); |
| | | throw new DirectoryException( |
| | | resultCode, message); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Reset the Replication Log. |
| | | * Calling this method will remove all the Replication information that |
| | | * was kept on all the Replication Servers currently connected in the |
| | |
| | | */ |
| | | public void resetReplicationLog() throws DirectoryException |
| | | { |
| | | // Reset the Generation ID to -1 to clean the ReplicationServers. |
| | | resetGenerationId((long)-1); |
| | | |
| | | // check that at least one ReplicationServer did change its generation-id |
| | | checkGenerationID(-1); |
| | | |
| | | // Reconnect to the Replication Server so that it adopt our |
| | | // GenerationID. |
| | | disableService(); |
| | | enableService(); |
| | | |
| | | resetGenerationId(getGenerationID()); |
| | | |
| | | // check that at least one ReplicationServer did change its generation-id |
| | | checkGenerationID(getGenerationID()); |
| | | } |
| | | |
| | | /** |
| | |
| | | if (!isConnected()) |
| | | { |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get( |
| | | serviceID); |
| | | Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID); |
| | | throw new DirectoryException( |
| | | resultCode, message); |
| | | } |
| | |
| | | Collection<String> replicationServers, int window, |
| | | long heartbeatInterval) throws ConfigException |
| | | { |
| | | /* |
| | | * create the broker object used to publish and receive changes |
| | | */ |
| | | broker = new ReplicationBroker( |
| | | this, state, serviceID, |
| | | serverID, window, |
| | | getGenerationID(), |
| | | heartbeatInterval, |
| | | new ReplSessionSecurity(), |
| | | getGroupId()); |
| | | if (broker == null) |
| | | { |
| | | /* |
| | | * create the broker object used to publish and receive changes |
| | | */ |
| | | broker = new ReplicationBroker( |
| | | this, state, serviceID, |
| | | serverID, window, |
| | | getGenerationID(), |
| | | heartbeatInterval, |
| | | new ReplSessionSecurity(), |
| | | getGroupId()); |
| | | |
| | | broker.start(replicationServers); |
| | | broker.start(replicationServers); |
| | | |
| | | /* |
| | | * Create a replication monitor object responsible for publishing |
| | | * monitoring information below cn=monitor. |
| | | */ |
| | | monitor = new ReplicationMonitor(this); |
| | | /* |
| | | * Create a replication monitor object responsible for publishing |
| | | * monitoring information below cn=monitor. |
| | | */ |
| | | monitor = new ReplicationMonitor(this); |
| | | |
| | | DirectoryServer.registerMonitorProvider(monitor); |
| | | DirectoryServer.registerMonitorProvider(monitor); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | * <p> |
| | | * After this method has been called, the Replication Service will start |
| | | * calling the {@link #processUpdate(UpdateMsg)}. |
| | | * <p> |
| | | * This method must be called once and must be called after the |
| | | * {@link #startPublishService(Collection, int, long)}. |
| | | * |
| | | */ |
| | | public void startListenService() |
| | | { |
| | | // |
| | | // Create the listener thread |
| | | listenerThread = new ListenerThread(this); |
| | | listenerThread.start(); |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2008 Sun Microsystems, Inc. |
| | | * Copyright 2008-2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | |
| | | fakeRs2.shutdown(); |
| | | fakeRs2 = null; |
| | | } |
| | | |
| | | |
| | | if (fakeRs3 != null) |
| | | { |
| | | fakeRs3.shutdown(); |
| | |
| | | { |
| | | port = rs3Port; |
| | | if (testCase.equals("testSafeDataManyRealRSs")) |
| | | { |
| | | { |
| | | // Every 3 RSs connected together |
| | | replServers.add("localhost:" + rs1Port); |
| | | replServers.add("localhost:" + rs2Port); |
| | |
| | | debugInfo("Fake DS " + getServerId() + " received update assured sd level is wrong: " + updateMsg); |
| | | ok = false; |
| | | } |
| | | |
| | | |
| | | if (ok) |
| | | debugInfo("Fake DS " + getServerId() + " received update assured parameters are ok: " + updateMsg); |
| | | else |
| | |
| | | |
| | | /** |
| | | * Connect to RS |
| | | * Returns true if connection was made successfuly |
| | | * Returns true if connection was made successfully |
| | | */ |
| | | public boolean connect() |
| | | { |
| | |
| | | { |
| | | return everyUpdatesAreOk; |
| | | } |
| | | |
| | | |
| | | public int nReceivedUpdates() |
| | | { |
| | | return nReceivedUpdates; |
| | |
| | | { |
| | | return new Object[][] |
| | | { |
| | | |
| | | |
| | | { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO}, |
| | | { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO}, |
| | | { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO}, |
| | |
| | | |
| | | if (objectArrayList.size() == 0) |
| | | { |
| | | // First time we add some parameters, create first object arrays |
| | | // First time we add some parameters, create first object arrays |
| | | // Add each possible parameter as initial parameter lists |
| | | for (Object possibleParameter : possibleParameters) |
| | | { |
| | |
| | | String testCase = "testSafeDataLevelHigh"; |
| | | |
| | | debugInfo("Starting " + testCase); |
| | | |
| | | |
| | | assertTrue(sdLevel > 1); |
| | | int nWishedServers = sdLevel - 1; // Number of fake RSs we want an ack from |
| | | |
| | |
| | | fakeRs3GenId, ((fakeRs3Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel, |
| | | fakeRs3Scen); |
| | | assertNotNull(fakeRs3); |
| | | |
| | | |
| | | // Wait for connections to be finished |
| | | // DS must see expected numbers of fake DSs and RSs |
| | | waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 3); |
| | |
| | | fail("No timeout is expected here"); |
| | | } |
| | | long sendUpdateTime = System.currentTimeMillis() - startTime; |
| | | |
| | | |
| | | // Check |
| | | sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers |
| | | checkTimeAndMonitoringSafeData(1, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers); |
| | |
| | | fail("No timeout is expected here"); |
| | | } |
| | | sendUpdateTime = System.currentTimeMillis() - startTime; |
| | | |
| | | |
| | | // Check |
| | | sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers |
| | | checkTimeAndMonitoringSafeData(3, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers); |
| | |
| | | { |
| | | fail("No timeout is expected here"); |
| | | } |
| | | sendUpdateTime = System.currentTimeMillis() - startTime; |
| | | |
| | | sendUpdateTime = System.currentTimeMillis() - startTime; |
| | | |
| | | // Check |
| | | sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers |
| | | checkTimeAndMonitoringSafeData(4, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers); |
| | |
| | | /* |
| | | * Send an assured update using configured assured parameters |
| | | */ |
| | | |
| | | |
| | | long startTime = System.currentTimeMillis(); |
| | | AckMsg ackMsg = null; |
| | | boolean timeout = false; |
| | |
| | | assertFalse(ackMsg.hasWrongStatus()); |
| | | assertEquals(ackMsg.getFailedServers().size(), 0); |
| | | } |
| | | |
| | | |
| | | } finally |
| | | { |
| | | endTest(); |
| | |
| | | rs3 = createReplicationServer(RS3_ID, DEFAULT_GID, SMALL_TIMEOUT, |
| | | testCase); |
| | | assertNotNull(rs3); |
| | | |
| | | |
| | | /* |
| | | * Start DS that will send updates |
| | | */ |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2008 Sun Microsystems, Inc. |
| | | * Copyright 2008-2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | |
| | | |
| | | private int exportedEntryCount; |
| | | |
| | | private long generationID = 1; |
| | | |
| | | public FakeReplicationDomain( |
| | | String serviceID, |
| | | short serverID, |
| | |
| | | @Override |
| | | public long getGenerationID() |
| | | { |
| | | return 1; |
| | | return generationID; |
| | | } |
| | | |
| | | @Override |
| | |
| | | queue.add(updateMsg); |
| | | return true; |
| | | } |
| | | |
| | | public void setGenerationID(long newGenerationID) |
| | | { |
| | | generationID = newGenerationID; |
| | | } |
| | | } |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2008 Sun Microsystems, Inc. |
| | | * Copyright 2008-2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | |
| | | public void publishAndReceive() throws Exception |
| | | { |
| | | String testService = "test"; |
| | | ReplicationServer replServer = null; |
| | | int replServerID = 10; |
| | | ReplicationServer replServer1 = null; |
| | | ReplicationServer replServer2 = null; |
| | | int replServerID1 = 10; |
| | | int replServerID2 = 20; |
| | | FakeReplicationDomain domain1 = null; |
| | | FakeReplicationDomain domain2 = null; |
| | | |
| | |
| | | { |
| | | // find a free port for the replicationServer |
| | | ServerSocket socket = TestCaseUtils.bindFreePort(); |
| | | int replServerPort = socket.getLocalPort(); |
| | | int replServerPort1 = socket.getLocalPort(); |
| | | socket.close(); |
| | | |
| | | ReplServerFakeConfiguration conf = |
| | | new ReplServerFakeConfiguration( |
| | | replServerPort, "ReplicationDomainTestDb", |
| | | 0, replServerID, 0, 100, null); |
| | | socket = TestCaseUtils.bindFreePort(); |
| | | int replServerPort2 = socket.getLocalPort(); |
| | | socket.close(); |
| | | |
| | | replServer = new ReplicationServer(conf); |
| | | TreeSet<String> replserver1 = new TreeSet<String>(); |
| | | replserver1.add("localhost:" + replServerPort1); |
| | | |
| | | TreeSet<String> replserver2 = new TreeSet<String>(); |
| | | replserver2.add("localhost:" + replServerPort2); |
| | | |
| | | ReplServerFakeConfiguration conf1 = |
| | | new ReplServerFakeConfiguration( |
| | | replServerPort1, "ReplicationDomainTestDb", |
| | | 0, replServerID1, 0, 100, replserver2); |
| | | |
| | | ReplServerFakeConfiguration conf2 = |
| | | new ReplServerFakeConfiguration( |
| | | replServerPort2, "ReplicationDomainTestDb", |
| | | 0, replServerID2, 0, 100, replserver1); |
| | | |
| | | replServer1 = new ReplicationServer(conf1);; |
| | | replServer2 = new ReplicationServer(conf2); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:" + replServerPort); |
| | | servers.add("localhost:" + replServerPort1); |
| | | |
| | | BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>(); |
| | | domain1 = new FakeReplicationDomain( |
| | |
| | | assertNotNull(rcvdMsg); |
| | | assertEquals(test, rcvdMsg.getPayload()); |
| | | |
| | | |
| | | /* |
| | | * Now test the resetReplicationLog() method. |
| | | */ |
| | | List<RSInfo> replServers = domain1.getRsList(); |
| | | |
| | | // There should be one and only one server in the list. |
| | | assertTrue(replServers.size() == 1); |
| | | for (RSInfo replServerInfo : replServers) |
| | | { |
| | | // The generation Id of the remote should be 1 |
| | | assertTrue(replServerInfo.getGenerationId() == 1); |
| | | } |
| | | |
| | | RSInfo replServerInfo = replServers.get(0); |
| | | |
| | | // The generation Id of the remote should be 1 |
| | | assertTrue(replServerInfo.getGenerationId() == 1); |
| | | |
| | | domain1.setGenerationID(2); |
| | | domain1.resetReplicationLog(); |
| | | Thread.sleep(1000); |
| | | |
| | | replServers = domain1.getRsList(); |
| | | |
| | | // There should be one and only one server in the list. |
| | | assertTrue(replServers.size() == 1); |
| | | |
| | | replServerInfo = replServers.get(0); |
| | | |
| | | // The generation Id of the remote should now be -1 |
| | | assertTrue(replServerInfo.getGenerationId() == -1); |
| | | for (RSInfo replServerInfo : replServers) |
| | | { |
| | | // The generation Id of the remote should now be 2 |
| | | assertTrue(replServerInfo.getGenerationId() == 2); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | |
| | | if (domain2 != null) |
| | | domain2.disableService(); |
| | | |
| | | if (replServer != null) |
| | | replServer.remove(); |
| | | if (replServer1 != null) |
| | | replServer1.remove(); |
| | | |
| | | if (replServer2 != null) |
| | | replServer2.remove(); |
| | | } |
| | | } |
| | | |