Made code more encapsulated and more explicit.
Removed parameters obscuring the code.
ReplicationServerDomain.java:
Renamed getServers() to getServerIds().
MessageHandler.java:
In getDomain(), removed createIfNotExist which was always true in calling code + called ReplicationServer.waitConnections() here.
In getRcvMsgQueueSize(), used early exit.
ReplicationServer.java:
In getReplicationServerDomain() methods, removed the create parameter.
Also extracted the waitConnections() method and called it higher up in the call hierarchy.
Created initDomainGenerationID() and addServerIdToDomain() methods here from code in ReplicationDbEnv.java
In runConnect(), collapsed if statements.
*.java
Consequence of the changes to:
- ReplicationServerDomain.getServers()
- ReplicationServer.getReplicationServerDomain()
- ReplicationServer.initDomainGenerationID() and addServerIdToDomain()
GenerationIdTest.java:
Extracted method isDegradedDueToGenerationId().
InitOnLineTest.java:
Extracted method getConnectedDSServerIds().
| | |
| | | { |
| | | if (allowUnknownDomains) |
| | | for (String providedDomain : startStatesFromProvidedCookie.keySet()) |
| | | if (rs.getReplicationServerDomain(providedDomain, false) == null) |
| | | if (rs.getReplicationServerDomain(providedDomain) == null) |
| | | // the domain provided in the cookie is not replicated |
| | | startStatesFromProvidedCookie.remove(providedDomain); |
| | | } |
| | |
| | | /** |
| | | * Returns the Replication Server Domain to which belongs this handler. |
| | | * |
| | | * @param createIfNotExist Creates the domain if it does not exist. |
| | | * @param waitConnections Waits for the Connections with other RS to |
| | | * be established before returning. |
| | | * @return The replication server domain. |
| | | */ |
| | | public ReplicationServerDomain getDomain( |
| | | boolean createIfNotExist, boolean waitConnections) |
| | | public ReplicationServerDomain getDomain(boolean waitConnections) |
| | | { |
| | | if (replicationServerDomain==null) |
| | | { |
| | | replicationServerDomain = replicationServer.getReplicationServerDomain( |
| | | baseDN, createIfNotExist, waitConnections); |
| | | replicationServerDomain = |
| | | replicationServer.getReplicationServerDomain(baseDN, true); |
| | | if (waitConnections) { |
| | | replicationServer.waitConnections(); |
| | | } |
| | | } |
| | | return replicationServerDomain; |
| | | } |
| | |
| | | new TreeSet<ReplicationIterator>(new ReplicationIteratorComparator()); |
| | | |
| | | // Build a list of candidates iterator (i.e. db i.e. server) |
| | | for (int serverId : replicationServerDomain.getServers()) |
| | | for (int serverId : replicationServerDomain.getServerIds()) |
| | | { |
| | | // get the last already sent CN from that server |
| | | ChangeNumber lastCsn = serverState.getChangeNumber(serverId); |
| | |
| | | * the number of updates to be sent is the size of the receive queue. |
| | | */ |
| | | if (following) |
| | | return msgQueue.count(); |
| | | else |
| | | { |
| | | return msgQueue.count(); |
| | | } |
| | | |
| | | /* |
| | | * When the server is not able to follow, the msgQueue |
| | | * may become too large and therefore won't contain all the |
| | | * changes. Some changes may only be stored in the backing DB |
| | | * of the servers. |
| | | * The total size of the receive queue is calculated by doing |
| | | * the sum of the number of missing changes for every dbHandler. |
| | | * When the server is not able to follow, the msgQueue may become too |
| | | * large and therefore won't contain all the changes. Some changes may |
| | | * only be stored in the backing DB of the servers. |
| | | * The total size of the receive queue is calculated by doing the sum of |
| | | * the number of missing changes for every dbHandler. |
| | | */ |
| | | ServerState dbState = replicationServerDomain.getDbServerState(); |
| | | return ServerState.diffChanges(dbState, serverState); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the state of this server. |
| | |
| | | { |
| | | this.baseDN = baseDN; |
| | | if (!baseDN.equalsIgnoreCase("cn=changelog")) |
| | | this.replicationServerDomain = getDomain(true, isDataServer); |
| | | this.replicationServerDomain = getDomain(isDataServer); |
| | | } |
| | | } |
| | | |
| | |
| | | final LDIFExportConfig exportConfig, LDIFWriter ldifWriter, |
| | | SearchOperation searchOperation, final ChangeNumber previousCN) |
| | | { |
| | | for (int serverId : rsd.getServers()) |
| | | for (int serverId : rsd.getServerIds()) |
| | | { |
| | | if (exportConfig != null && exportConfig.isCancelled()) |
| | | { // Abort if cancelled |
| | |
| | | |
| | | // FIXME: this will need changing if we ever support listening on |
| | | // specific addresses. |
| | | if (isLocalAddress(inetAddress) && (port == replicationPort)) |
| | | { |
| | | continue; |
| | | } |
| | | |
| | | if ((isLocalAddress(inetAddress) && port == replicationPort) |
| | | // Don't connect to a server if it is already connected. |
| | | final String normalizedServerURL = normalizeServerURL(rsURL); |
| | | if (connectedRSUrls.contains(normalizedServerURL)) |
| | | || connectedRSUrls.contains(normalizeServerURL(rsURL))) |
| | | { |
| | | continue; |
| | | } |
| | |
| | | * |
| | | * @param baseDn The base Dn for which the ReplicationServerDomain must be |
| | | * returned. |
| | | * @param create Specifies whether to create the ReplicationServerDomain if |
| | | * it does not already exist. |
| | | * @return The ReplicationServerDomain associated to the base DN given in |
| | | * parameter. |
| | | */ |
| | | public ReplicationServerDomain getReplicationServerDomain(String baseDn, |
| | | boolean create) |
| | | public ReplicationServerDomain getReplicationServerDomain(String baseDn) |
| | | { |
| | | return getReplicationServerDomain(baseDn, create, false); |
| | | return getReplicationServerDomain(baseDn, false); |
| | | } |
| | | |
| | | /** |
| | |
| | | * returned. |
| | | * @param create Specifies whether to create the ReplicationServerDomain if |
| | | * it does not already exist. |
| | | * @param waitConnections Waits for the Connections with other RS to |
| | | * be established before returning. |
| | | * @return The ReplicationServerDomain associated to the base DN given in |
| | | * parameter. |
| | | */ |
| | | public ReplicationServerDomain getReplicationServerDomain(String baseDn, |
| | | boolean create, boolean waitConnections) |
| | | boolean create) |
| | | { |
| | | ReplicationServerDomain domain; |
| | | |
| | | synchronized (baseDNs) |
| | | { |
| | | domain = baseDNs.get(baseDn); |
| | | |
| | | if (domain != null ||!create) { |
| | | return domain; |
| | | } |
| | | |
| | | ReplicationServerDomain domain = baseDNs.get(baseDn); |
| | | if (domain == null && create) { |
| | | domain = new ReplicationServerDomain(baseDn, this); |
| | | baseDNs.put(baseDn, domain); |
| | | } |
| | | return domain; |
| | | } |
| | | } |
| | | |
| | | if (waitConnections) |
| | | /** |
| | | * Waits for connections to this ReplicationServer. |
| | | */ |
| | | public void waitConnections() |
| | | { |
| | | // Acquire a domain ticket and wait for a complete cycle of the connect |
| | | // thread. |
| | |
| | | // Wait until the connect thread has processed next connect phase. |
| | | synchronized (domainTicketLock) |
| | | { |
| | | // Condition. |
| | | while (myDomainTicket > domainTicket && !shutdown) |
| | | { |
| | | try |
| | |
| | | } |
| | | } |
| | | |
| | | return domain; |
| | | } |
| | | |
| | | /** |
| | | * Shutdown the Replication Server service and all its connections. |
| | | */ |
| | |
| | | */ |
| | | public long getGenerationId(String baseDN) |
| | | { |
| | | ReplicationServerDomain rsd = getReplicationServerDomain(baseDN, false); |
| | | ReplicationServerDomain rsd = getReplicationServerDomain(baseDN); |
| | | if (rsd!=null) |
| | | return rsd.getGenerationId(); |
| | | return -1; |
| | |
| | | return "RS(" + serverId + ") on " + serverURL + ", domains=" |
| | | + baseDNs.keySet(); |
| | | } |
| | | |
| | | /** |
| | | * Initializes the generationId for the specified replication domain. |
| | | * |
| | | * @param baseDn |
| | | * the replication domain |
| | | * @param generationId |
| | | * the the generationId value for initialization |
| | | */ |
| | | public void initDomainGenerationID(String baseDn, long generationId) |
| | | { |
| | | getReplicationServerDomain(baseDn, true).initGenerationID(generationId); |
| | | } |
| | | |
| | | /** |
| | | * Adds the specified serverId to the specified replication domain. |
| | | * |
| | | * @param serverId |
| | | * the server Id to add to the replication domain |
| | | * @param baseDn |
| | | * the replication domain where to add the serverId |
| | | * @throws ChangelogException |
| | | * If a database error happened. |
| | | */ |
| | | public void addServerIdToDomain(int serverId, String baseDn) |
| | | throws ChangelogException |
| | | { |
| | | DbHandler dbHandler = newDbHandler(serverId, baseDn); |
| | | getReplicationServerDomain(baseDn, true).setDbHandler(serverId, dbHandler); |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Return a set containing the server that produced update and known by |
| | | * this replicationServer from all over the topology, |
| | | * whatever directly connected of connected to another RS. |
| | | * @return a set containing the servers known by this replicationServer. |
| | | * Returns a set containing the serverIds that produced updates and known by |
| | | * this replicationServer from all over the topology, whether directly |
| | | * connected or connected to another RS. |
| | | * |
| | | * @return a set containing the serverIds known by this replicationServer. |
| | | */ |
| | | public Set<Integer> getServers() |
| | | public Set<Integer> getServerIds() |
| | | { |
| | | return sourceDbHandlers.keySet(); |
| | | } |
| | |
| | | public String getMonitorInstanceName() |
| | | { |
| | | ReplicationServerDomain domain = replicationServer |
| | | .getReplicationServerDomain(baseDn, false); |
| | | .getReplicationServerDomain(baseDn); |
| | | return "Changelog for DS(" + serverId + "),cn=" |
| | | + domain.getMonitorInstanceName(); |
| | | } |
| | |
| | | continue; |
| | | } |
| | | |
| | | final ReplicationServerDomain domain = replicationServer |
| | | .getReplicationServerDomain(baseDN, false); |
| | | final ReplicationServerDomain domain = |
| | | replicationServer.getReplicationServerDomain(baseDN); |
| | | if (domain == null) |
| | | { |
| | | // the domain has been removed since the record was written in the |
| | |
| | | + this.replicationServer.getMonitorInstanceName() |
| | | + " Has read baseDn=" + baseDn + " generationId=" + generationId); |
| | | |
| | | replicationServer.getReplicationServerDomain(baseDn, true) |
| | | .initGenerationID(generationId); |
| | | replicationServer.initDomainGenerationID(baseDn, generationId); |
| | | } |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | |
| | | + this.replicationServer.getMonitorInstanceName() |
| | | + " Has read: baseDn=" + baseDn + " serverId=" + serverId); |
| | | |
| | | DbHandler dbHandler = |
| | | new DbHandler(serverId, baseDn, replicationServer, this, |
| | | replicationServer.getQueueSize()); |
| | | |
| | | replicationServer.getReplicationServerDomain(baseDn, true) |
| | | .setDbHandler(serverId, dbHandler); |
| | | replicationServer.addServerIdToDomain(serverId, baseDn); |
| | | } |
| | | |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | |
| | | publishDeleteMsgInOTest(s2test, cn9, tn, 9); |
| | | sleep(500); |
| | | |
| | | ReplicationServerDomain rsd = |
| | | replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING, false); |
| | | ReplicationServerDomain rsd = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING); |
| | | ServerState startState = rsd.getStartState(); |
| | | assertEquals(startState.getChangeNumber(s1test.getServerId()).getSeqnum(), 1); |
| | | assertTrue(startState.getChangeNumber(s2test.getServerId()) != null); |
| | | assertEquals(startState.getChangeNumber(s2test.getServerId()).getSeqnum(), 7); |
| | | |
| | | rsd = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2, false); |
| | | rsd = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2); |
| | | startState = rsd.getStartState(); |
| | | assertEquals(startState.getChangeNumber(s2test2.getServerId()).getSeqnum(), 2); |
| | | assertEquals(startState.getChangeNumber(s1test2.getServerId()).getSeqnum(), 6); |
| | |
| | | // --- |
| | | // 2. Now set up a very short purge delay on the replication changelogs |
| | | // so that this test can play with a trimmed changelog. |
| | | d1 = replicationServer.getReplicationServerDomain("o=test", false); |
| | | d2 = replicationServer.getReplicationServerDomain("o=test2", false); |
| | | d1 = replicationServer.getReplicationServerDomain("o=test"); |
| | | d2 = replicationServer.getReplicationServerDomain("o=test2"); |
| | | d1.setPurgeDelay(1); |
| | | d2.setPurgeDelay(1); |
| | | |
| | |
| | | publishDeleteMsgInOTest(s2test, cn9, tn, 9); |
| | | sleep(500); |
| | | |
| | | ReplicationServerDomain rsd1 = |
| | | replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING, false); |
| | | ReplicationServerDomain rsd1 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING); |
| | | rsd1.getDbServerState(); |
| | | rsd1.getChangeTimeHeartbeatState(); |
| | | debugInfo(tn, rsd1.getBaseDn() |
| | |
| | | + " rs eligibleCN=" + replicationServer.getEligibleCN()); |
| | | // FIXME:ECL Enable this test by adding an assert on the right value |
| | | |
| | | ReplicationServerDomain rsd2 = |
| | | replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2, false); |
| | | ReplicationServerDomain rsd2 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2); |
| | | rsd2.getDbServerState(); |
| | | rsd2.getChangeTimeHeartbeatState(); |
| | | debugInfo(tn, rsd2.getBaseDn() |
| | |
| | | final ChangeNumber cn2 = cns[1]; |
| | | final ChangeNumber cn3 = cns[2]; |
| | | |
| | | ReplicationServerDomain rsdtest = |
| | | replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING, false); |
| | | ReplicationServerDomain rsdtest = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING); |
| | | // this empty state will force to count from the start of the DB |
| | | final ServerState fromStart = new ServerState(); |
| | | |
| | |
| | | */ |
| | | package org.opends.server.replication; |
| | | |
| | | import java.io.File; |
| | | import java.net.SocketException; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.*; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationBackend; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | | import org.opends.server.replication.service.ReplicationBroker; |
| | | import org.opends.server.tasks.LdifFileWriter; |
| | | import org.opends.server.types.*; |
| | |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import java.io.File; |
| | | import java.net.SocketException; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.*; |
| | | |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | |
| | | debugInfo("RS1 must have been cleared since it has not the proper generation ID"); |
| | | checkChangelogSize(0); |
| | | |
| | | assertTrue(!replServer1.getReplicationServerDomain( |
| | | baseDn.toNormalizedString(), false). |
| | | isDegradedDueToGenerationId(server1ID), |
| | | assertFalse(isDegradedDueToGenerationId(replServer1, server1ID), |
| | | "Expecting that DS1 status in RS1 is : not in bad gen id."); |
| | | |
| | | //=============================================================== |
| | | debugInfo(testCase + " ** TEST ** Previous test set a new gen ID on the "+ |
| | | "topology, verify degradation of DS2 and DS3"); |
| | | |
| | | assertTrue(replServer1.getReplicationServerDomain( |
| | | baseDn.toNormalizedString(), false). |
| | | isDegradedDueToGenerationId(server2ID), |
| | | assertTrue(isDegradedDueToGenerationId(replServer1, server2ID), |
| | | "Expecting that DS2 with old gen ID is in bad gen id from RS1"); |
| | | assertTrue(replServer1.getReplicationServerDomain( |
| | | baseDn.toNormalizedString(), false). |
| | | isDegradedDueToGenerationId(server3ID), |
| | | assertTrue(isDegradedDueToGenerationId(replServer1, server3ID), |
| | | "Expecting that DS3 with old gen ID is in bad gen id from RS1"); |
| | | |
| | | debugInfo("Add entries to DS1, update should not be sent to DS2 and DS3 that are in bad gen id"); |
| | |
| | | checkChangelogSize(1); |
| | | |
| | | debugInfo("Verifying that DS2 is not in bad gen id any more"); |
| | | |
| | | assertTrue(!replServer1.getReplicationServerDomain( |
| | | baseDn.toNormalizedString(), false). |
| | | isDegradedDueToGenerationId(server2ID), |
| | | assertFalse(isDegradedDueToGenerationId(replServer1, server2ID), |
| | | "Expecting that DS2 is not in bad gen id from RS1"); |
| | | |
| | | debugInfo("Verifying that DS3 is not in bad gen id any more"); |
| | | |
| | | assertTrue(!replServer1.getReplicationServerDomain( |
| | | baseDn.toNormalizedString(), false). |
| | | isDegradedDueToGenerationId(server3ID), |
| | | assertFalse(isDegradedDueToGenerationId(replServer1, server3ID), |
| | | "Expecting that DS3 is not in bad gen id from RS1"); |
| | | |
| | | debugInfo("Verify that DS2 receives the add message stored in RS1 DB"); |
| | |
| | | fail("Broker connection is expected to be accepted."); |
| | | } |
| | | |
| | | debugInfo( |
| | | "Expecting that broker2 is not in bad gen id since it has a correct genId"); |
| | | assertTrue(!replServer1.getReplicationServerDomain(baseDn.toNormalizedString(), false). |
| | | isDegradedDueToGenerationId(server2ID)); |
| | | debugInfo("Expecting that broker2 is not in bad gen id since it has a correct genId"); |
| | | assertFalse(isDegradedDueToGenerationId(replServer1, server2ID)); |
| | | |
| | | debugInfo("Disconnecting DS from replServer1"); |
| | | disconnectFromReplServer(changelog1ID); |
| | |
| | | fail("Broker connection is expected to be accepted."); |
| | | } |
| | | |
| | | debugInfo( |
| | | "Expecting that broker3 is in bad gen id since it has a bad genId"); |
| | | assertTrue(replServer1.getReplicationServerDomain(baseDn.toNormalizedString(), false). |
| | | isDegradedDueToGenerationId(server3ID)); |
| | | debugInfo("Expecting that broker3 is in bad gen id since it has a bad genId"); |
| | | assertTrue(isDegradedDueToGenerationId(replServer1, server3ID)); |
| | | |
| | | int found = testEntriesInDb(); |
| | | assertEquals(found, updatedEntries.length, |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean isDegradedDueToGenerationId(ReplicationServer rs, int serverId) |
| | | { |
| | | ReplicationServerDomain domain = rs.getReplicationServerDomain(baseDn.toNormalizedString()); |
| | | return domain.isDegradedDueToGenerationId(serverId); |
| | | } |
| | | |
| | | /** |
| | | * Disconnect broker and remove entries from the local DB |
| | | */ |
| | |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | | import org.opends.server.replication.service.ReplicationBroker; |
| | | import org.opends.server.schema.DirectoryStringSyntax; |
| | | import org.opends.server.types.*; |
| | |
| | | broker3 = openReplicationSession(DN.decode(EXAMPLE_DN), |
| | | server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges); |
| | | |
| | | // Check that the list of connected LDAP servers is correct |
| | | // in each replication servers |
| | | Set<Integer> l1 = changelog1.getReplicationServerDomain( |
| | | baseDn.toNormalizedString(), false).getConnectedDSs().keySet(); |
| | | Assertions.assertThat(l1).containsExactly(server1ID); |
| | | |
| | | Set<Integer> l2 = changelog2.getReplicationServerDomain( |
| | | baseDn.toNormalizedString(), false).getConnectedDSs().keySet(); |
| | | Assertions.assertThat(l2).containsExactly(server2ID, server3ID); |
| | | |
| | | Set<Integer> l3 = changelog3.getReplicationServerDomain( |
| | | baseDn.toNormalizedString(), false).getConnectedDSs().keySet(); |
| | | Assertions.assertThat(l3).isEmpty(); |
| | | // Check that the list of connected LDAP servers is correct in each replication servers |
| | | Assertions.assertThat(getConnectedDSServerIds(changelog1)).containsExactly(server1ID); |
| | | Assertions.assertThat(getConnectedDSServerIds(changelog2)).containsExactly(server2ID, server3ID); |
| | | Assertions.assertThat(getConnectedDSServerIds(changelog3)).isEmpty(); |
| | | |
| | | // Test updates |
| | | broker3.stop(); |
| | | Thread.sleep(1000); |
| | | l2 = changelog2.getReplicationServerDomain( |
| | | baseDn.toNormalizedString(), false).getConnectedDSs().keySet(); |
| | | Assertions.assertThat(l2).containsExactly(server2ID); |
| | | Assertions.assertThat(getConnectedDSServerIds(changelog2)).containsExactly(server2ID); |
| | | |
| | | broker3 = openReplicationSession(DN.decode(EXAMPLE_DN), |
| | | server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges); |
| | | broker2.stop(); |
| | | Thread.sleep(1000); |
| | | l2 = changelog2.getReplicationServerDomain( |
| | | baseDn.toNormalizedString(), false).getConnectedDSs().keySet(); |
| | | Assertions.assertThat(l2).containsExactly(server3ID); |
| | | Assertions.assertThat(getConnectedDSServerIds(changelog2)).containsExactly(server3ID); |
| | | |
| | | // TODO Test ReplicationServerDomain.getDestinationServers method. |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | private Set<Integer> getConnectedDSServerIds(ReplicationServer changelog) |
| | | { |
| | | ReplicationServerDomain domain = changelog.getReplicationServerDomain(baseDn.toNormalizedString()); |
| | | return domain.getConnectedDSs().keySet(); |
| | | } |
| | | |
| | | @Test(enabled=true, groups="slow") |
| | | public void initializeTargetExportMultiSS() throws Exception |
| | | { |