opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -31,7 +31,10 @@ import java.math.BigDecimal; import java.math.MathContext; import java.math.RoundingMode; import java.net.*; import java.net.ConnectException; import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; @@ -72,7 +75,7 @@ /** * Replication server URLs under this format: "<code>hostname:port</code>". */ private volatile Collection<String> replicationServerUrls; private volatile Set<String> replicationServerUrls; private volatile boolean connected = false; /** * String reported under CSN=monitor when there is no connected RS. @@ -98,9 +101,9 @@ /** The server id of the RS we are connected to. */ private Integer rsServerId = -1; /** The server URL of the RS we are connected to. */ private String rsServerUrl = null; private String rsServerUrl; /** Our replication domain. */ private ReplicationDomain domain = null; private ReplicationDomain domain; /** * This object is used as a conditional event to be notified about * the reception of monitor information from the Replication Server. @@ -121,7 +124,7 @@ /** * A thread to monitor heartbeats on the session. */ private HeartbeatMonitor heartbeatMonitor = null; private HeartbeatMonitor heartbeatMonitor; /** * The number of times the connection was lost. */ @@ -140,7 +143,7 @@ * The thread that publishes messages to the RS containing the current * change time of this DS. */ private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null; private CTHeartbeatPublisherThread ctHeartbeatPublisherThread; /** * The expected period in milliseconds between these messages are sent * to the replication server. Zero means heartbeats are off. @@ -149,8 +152,11 @@ /* * Properties for the last topology info received from the network. */ // Info for other DSs. // Warning: does not contain info for us (for our server id) /** * Info for other DSs. * <p> * Warning: does not contain info for us (for our server id) */ private volatile List<DSInfo> dsList = new ArrayList<DSInfo>(); private volatile long generationID; private volatile int updateDoneCount = 0; @@ -162,8 +168,7 @@ * replication server one wants to connect. Key: replication server id Value: * replication server info for the matching replication server id */ private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos = null; private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos; /** * This integer defines when the best replication server checking algorithm @@ -256,7 +261,7 @@ * * @param replicationServers list of servers used */ public void start(Collection<String> replicationServers) public void start(Set<String> replicationServers) { synchronized (startStopLock) { @@ -1068,9 +1073,13 @@ { // RS has no generation id return ServerStatus.NORMAL_STATUS; } else } else if (rsGenId != dsGenId) { if (rsGenId == dsGenId) // DS and RS do not have same generation id return ServerStatus.BAD_GEN_ID_STATUS; } else { /* DS and RS have same generation id @@ -1082,9 +1091,7 @@ value to determine if we are late or not */ ServerStatus initStatus; int nChanges = ServerState.diffChanges(rsState, state); if (debugEnabled()) { TRACER.debugInfo("RB for dn " + baseDN + " and with server id " @@ -1098,30 +1105,13 @@ the changeStatusFromStatusAnalyzer method. This allows to take the lock roughly only when needed versus every sleep time timeout. */ if (degradedStatusThreshold > 0) if (degradedStatusThreshold > 0 && nChanges >= degradedStatusThreshold) { if (nChanges >= degradedStatusThreshold) { initStatus = ServerStatus.DEGRADED_STATUS; } else { initStatus = ServerStatus.NORMAL_STATUS; return ServerStatus.DEGRADED_STATUS; } } else { /* 0 threshold value means no degrading system used (no threshold): force normal status */ initStatus = ServerStatus.NORMAL_STATUS; } return initStatus; } else { // DS and RS do not have same generation id return ServerStatus.BAD_GEN_ID_STATUS; } // degradedStatusThreshold value of '0' means no degrading system used // (no threshold): force normal status return ServerStatus.NORMAL_STATUS; } } @@ -1476,16 +1466,12 @@ { // We are not connected to a server yet return computeBestServerForWeight(bestServers, -1, -1); } else { /* We are already connected to a RS: compute the best RS as far as the weights is concerned. If this is another one, some DS must disconnect. */ return computeBestServerForWeight(bestServers, rsServerId, localServerId); } /* * We are already connected to a RS: compute the best RS as far as the * weights is concerned. If this is another one, some DS must disconnect. */ return computeBestServerForWeight(bestServers, rsServerId, localServerId); } /** @@ -1783,11 +1769,12 @@ sumOfWeights += replicationServerInfo.getWeight(); sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber(); } // Distance (difference) of the current loads to the load goals of each RS: // key:server id, value: distance Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>(); // Precision for the operations (number of digits after the dot) MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP); final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP); for (Integer rsId : bestServers.keySet()) { ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); @@ -1812,8 +1799,20 @@ if (currentRsServerId == -1) { // The local server is not connected yet // The local server is not connected yet, find best server to connect to, // taking the weights into account. return computeBestServerWhenNotConnected(bestServers, loadDistances); } // The local server is currently connected to a RS, let's see if it must // disconnect or not, taking the weights into account. return computeBestServerWhenConnected(bestServers, loadDistances, localServerId, currentRsServerId, sumOfWeights, sumOfConnectedDSs); } private static ReplicationServerInfo computeBestServerWhenNotConnected( Map<Integer, ReplicationServerInfo> bestServers, Map<Integer, BigDecimal> loadDistances) { /* * Find the server with the current highest distance to its load goal and * choose it. Make an exception if every server is correctly balanced, @@ -1853,11 +1852,14 @@ bestRsId = highestWeightRsId; } return bestServers.get(bestRsId); } else { // The local server is currently connected to a RS, let's see if it must // disconnect or not, taking the weights into account. } private static ReplicationServerInfo computeBestServerWhenConnected( Map<Integer, ReplicationServerInfo> bestServers, Map<Integer, BigDecimal> loadDistances, int localServerId, int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs) { final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP); float currentLoadDistance = loadDistances.get(currentRsServerId).floatValue(); if (currentLoadDistance < 0) @@ -1949,8 +1951,7 @@ arrived on the new RS. But we should disconnect if we reach the perfect balance (both values are 0). */ MathContext roundMc = new MathContext(6, RoundingMode.DOWN); MathContext roundMc = new MathContext(6, RoundingMode.DOWN); BigDecimal potentialCurrentRsNewLoadDistanceBdRounded = potentialCurrentRsNewLoadDistanceBd.round(roundMc); BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded = @@ -1972,18 +1973,16 @@ ReplicationServerInfo currentRsInfo = bestServers.get(currentRsServerId); List<Integer> serversConnectedToCurrentRS = currentRsInfo.getConnectedDSs(); List<Integer> sortedServers = new ArrayList<Integer>( serversConnectedToCurrentRS); Collections.sort(sortedServers); new ArrayList<Integer>(currentRsInfo.getConnectedDSs()); Collections.sort(serversConnectedToCurrentRS); // Go through the list of DSs to disconnect and see if the local // server is part of them. int index = 0; while (overloadingDSsNumber > 0) { int severToDisconnectId = sortedServers.get(index); if (severToDisconnectId == localServerId) int serverIdToDisconnect = serversConnectedToCurrentRS.get(index); if (serverIdToDisconnect == localServerId) { // The local server is part of the DSs to disconnect return null; @@ -2005,7 +2004,6 @@ } return bestServers.get(currentRsServerId); } } /** * Start the heartbeat monitor thread. @@ -2117,7 +2115,7 @@ */ public void publish(ReplicationMsg msg) { _publish(msg, false, true); publish(msg, false, true); } /** @@ -2128,7 +2126,7 @@ */ public boolean publish(ReplicationMsg msg, boolean retryOnFailure) { return _publish(msg, false, retryOnFailure); return publish(msg, false, retryOnFailure); } /** @@ -2137,7 +2135,7 @@ */ public void publishRecovery(ReplicationMsg msg) { _publish(msg, true, true); publish(msg, true, true); } /** @@ -2147,7 +2145,7 @@ * @param retryOnFailure whether retry should be done on failure * @return whether the message was successfully sent. */ boolean _publish(ReplicationMsg msg, boolean recoveryMsg, private boolean publish(ReplicationMsg msg, boolean recoveryMsg, boolean retryOnFailure) { boolean done = false; @@ -2175,10 +2173,6 @@ try { boolean credit; Session current_session; Semaphore currentWindowSemaphore; /* save the session at the time when we acquire the sendwindow credit so that we can make sure later @@ -2187,9 +2181,11 @@ on a session with a credit that was acquired from a previous session. */ Session currentSession; Semaphore currentWindowSemaphore; synchronized (connectPhaseLock) { current_session = session; currentSession = session; currentWindowSemaphore = sendWindow; } @@ -2204,6 +2200,7 @@ return false; } boolean credit; if (msg instanceof UpdateMsg) { /* @@ -2217,6 +2214,7 @@ { credit = true; } if (credit) { synchronized (connectPhaseLock) @@ -2228,8 +2226,7 @@ reconnection happened and we need to restart from scratch. */ if ((session != null) && (session == current_session)) if (session != null && session == currentSession) { session.publish(msg); done = true; @@ -2340,7 +2337,7 @@ break; } final int replicationServerID = rsServerId; final int previousRsServerID = rsServerId; try { ReplicationMsg msg = savedSession.receive(); @@ -2375,7 +2372,7 @@ { // RS performs a proper disconnection Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get( replicationServerID, savedSession.getReadableRemoteAddress(), previousRsServerID, savedSession.getReadableRemoteAddress(), serverId, baseDN.toNormalizedString()); logError(message); @@ -2425,13 +2422,12 @@ { // Stable topology (no topo msg since few seconds): proceed with // best server checking. ReplicationServerInfo bestServerInfo = computeBestReplicationServer(false, rsServerId, state, replicationServerInfos, serverId, groupId, generationID); if ((rsServerId != -1) && ((bestServerInfo == null) || (bestServerInfo.getServerId() != rsServerId))) final ReplicationServerInfo bestServerInfo = computeBestReplicationServer(false, previousRsServerID, state, replicationServerInfos, serverId, groupId, generationID); if (previousRsServerID != -1 && (bestServerInfo == null || bestServerInfo.getServerId() != previousRsServerID)) { // The best replication server is no more the one we are // currently using. Disconnect properly then reconnect. @@ -2439,14 +2435,14 @@ if (bestServerInfo == null) { message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get( serverId, replicationServerID, serverId, previousRsServerID, savedSession.getReadableRemoteAddress(), baseDN.toNormalizedString()); } else { message = NOTE_NEW_BEST_REPLICATION_SERVER.get( serverId, replicationServerID, serverId, previousRsServerID, savedSession.getReadableRemoteAddress(), bestServerInfo.getServerId(), baseDN.toNormalizedString()); @@ -2483,7 +2479,7 @@ { // We did not initiate the close on our side, log an error message. Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get( serverId, baseDN.toNormalizedString(), replicationServerID, serverId, baseDN.toNormalizedString(), previousRsServerID, savedSession.getReadableRemoteAddress()); logError(message); } @@ -2684,9 +2680,8 @@ * requires to restart the service. * @param groupId The new group id to use */ public boolean changeConfig( Collection<String> replicationServers, int window, long heartbeatInterval, byte groupId) public boolean changeConfig(Set<String> replicationServers, int window, long heartbeatInterval, byte groupId) { // These parameters needs to be renegotiated with the ReplicationServer // so if they have changed, that requires restarting the session with @@ -2695,8 +2690,7 @@ // the connection is modified boolean needToRestartSession = this.replicationServerUrls == null || replicationServers.size() != this.replicationServerUrls.size() || !replicationServers.containsAll(this.replicationServerUrls) || !replicationServers.equals(this.replicationServerUrls) || window != this.maxRcvWindow || heartbeatInterval != this.heartbeatInterval || groupId != this.groupId; @@ -2788,12 +2782,10 @@ */ public List<RSInfo> getRsList() { List<RSInfo> result = new ArrayList<RSInfo>(); for (ReplicationServerInfo replicationServerInfo : replicationServerInfos.values()) final List<RSInfo> result = new ArrayList<RSInfo>(); for (ReplicationServerInfo rsInfo : replicationServerInfos.values()) { result.add(replicationServerInfo.toRSInfo()); result.add(rsInfo.toRSInfo()); } return result; } @@ -2989,14 +2981,14 @@ } /** * Returns the replication monitor associated with this broker. * Returns the replication monitor instance name associated with this broker. * * @return The replication monitor. * @return The replication monitor instance name. */ ReplicationMonitor getReplicationMonitor() String getReplicationMonitorInstanceName() { // Only invoked by replication domain so always non-null. return monitor; return monitor.getMonitorInstanceName(); } private void setSession(final Session newSession) opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -1478,7 +1478,7 @@ } if (debugEnabled()) TRACER.debugInfo("[IE] In " + getReplicationMonitorInstanceName() TRACER.debugInfo("[IE] In " + broker.getReplicationMonitorInstanceName() + " export ends with " + " connected=" + broker.isConnected() + " exportRootException=" + exportRootException); @@ -1585,11 +1585,6 @@ } } private String getReplicationMonitorInstanceName() { return broker.getReplicationMonitor().getMonitorInstanceName(); } /** * For all remote servers in the start list: * - wait it has finished the import and present the expected generationID, @@ -1835,7 +1830,8 @@ msg = broker.receive(false, false, true); if (debugEnabled()) TRACER.debugInfo("[IE] In " + getReplicationMonitorInstanceName() TRACER.debugInfo("[IE] In " + broker.getReplicationMonitorInstanceName() + ", receiveEntryBytes " + msg); if (msg == null) @@ -1888,7 +1884,7 @@ broker.publish(amsg, false); if (debugEnabled()) TRACER.debugInfo("[IE] In " + getReplicationMonitorInstanceName() + broker.getReplicationMonitorInstanceName() + ", publish InitializeRcvAckMsg" + amsg); } } @@ -2072,13 +2068,12 @@ TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry=" + Arrays.toString(lDIFEntry)); // publish the message boolean sent = broker.publish(entryMessage, false); // process any publish error if (((!sent)|| (broker.hasConnectionError()))|| (broker.getNumLostConnections() != ieContext.initNumLostConnections)) if (!sent || broker.hasConnectionError() || broker.getNumLostConnections() != ieContext.initNumLostConnections) { // publish failed - store the error in the ieContext ... DirectoryException de = new DirectoryException(ResultCode.OTHER, @@ -2125,8 +2120,7 @@ * @throws DirectoryException If it was not possible to publish the * Initialization message to the Topology. */ public void initializeFromRemote(int source) throws DirectoryException public void initializeFromRemote(int source) throws DirectoryException { initializeFromRemote(source, null); } @@ -2966,8 +2960,7 @@ * @throws ConfigException If the DirectoryServer configuration was * incorrect. */ public void startPublishService( Collection<String> replicationServers, int window, public void startPublishService(Set<String> replicationServers, int window, long heartbeatInterval, long changetimeHeartbeatInterval) throws ConfigException { @@ -3078,18 +3071,15 @@ /** * Change some ReplicationDomain parameters. * * @param replicationServers The new list of Replication Servers that this * @param replicationServers The new set of Replication Servers that this * domain should now use. * @param windowSize The window size that this domain should use. * @param heartbeatInterval The heartbeatInterval that this domain should * use. * @param groupId The new group id to use */ public void changeConfig( Collection<String> replicationServers, int windowSize, long heartbeatInterval, byte groupId) public void changeConfig(Set<String> replicationServers, int windowSize, long heartbeatInterval, byte groupId) { this.groupId = groupId; @@ -3576,15 +3566,13 @@ Set<String> s2 = new HashSet<String>(s1); s2.addAll(includeAttributesForDeletes); Set<String> s = eclIncludesByServer.get(serverId); if (!s1.equals(s)) if (!s1.equals(eclIncludesByServer.get(serverId))) { configurationChanged = true; eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1)); } s = eclIncludesForDeletesByServer.get(serverId); if (!s2.equals(s)) if (!s2.equals(eclIncludesForDeletesByServer.get(serverId))) { configurationChanged = true; eclIncludesForDeletesByServer.put(serverId, @@ -3592,7 +3580,7 @@ } // and rebuild the global list to be ready for usage s = new HashSet<String>(); Set<String> s = new HashSet<String>(); for (Set<String> attributes : eclIncludesByServer.values()) { s.addAll(attributes); opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -1922,4 +1922,20 @@ dsconfig("set-backend-prop", "--backend-name", backendID, "--set", "enabled:" + enabled); } public static <T> Set<T> newSet(T... elems) { return new HashSet<T>(Arrays.asList(elems)); } public static <T> SortedSet<T> newSortedSet(T... elems) { return new TreeSet<T>(Arrays.asList(elems)); } public static <T> List<T> newList(T... elems) { return new ArrayList<T>(Arrays.asList(elems)); } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -236,7 +236,7 @@ private void connect(ReplicationBroker broker, int port, int timeout) throws Exception { broker.start(Collections.singletonList("localhost:" + port)); broker.start(Collections.singleton("localhost:" + port)); // give some time to the broker to connect to the replicationServer. checkConnection(30, broker, port); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -375,8 +375,7 @@ private void createFakeReplicationDomain(boolean firstBackend, long generationId) throws Exception { List<String> replicationServers = new ArrayList<String>(); replicationServers.add("localhost:" + replServerPort); Set<String> replicationServers = newSet("localhost:" + replServerPort); DN baseDN = DN.decode(firstBackend ? TEST_ROOT_DN_STRING : TEST2_ROOT_DN_STRING); replicationDomain = new FakeReplicationDomain(baseDN, DS2_ID, replicationServers, 100, 1000, generationId); @@ -566,12 +565,8 @@ private int exportedEntryCount; private long generationID = -1; public FakeReplicationDomain( DN baseDN, int serverID, Collection<String> replicationServers, int window, long heartbeatInterval, public FakeReplicationDomain(DN baseDN, int serverID, Set<String> replicationServers, int window, long heartbeatInterval, long generationId) throws ConfigException { super(baseDN, serverID, 100); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
@@ -29,10 +29,7 @@ import java.io.IOException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import org.opends.messages.Category; @@ -229,9 +226,7 @@ ReplSessionSecurity security = new ReplSessionSecurity(null, null, null, true); ReplicationBroker broker = new ReplicationBroker(null, state, EXAMPLE_DN_, dsId, 100, generationId, 0, security, (byte) 1, 500); List<String> servers = new ArrayList<String>(1); servers.add("localhost:" + rs1Port); broker.start(servers); broker.start(Collections.singleton("localhost:" + rs1Port)); checkConnection(30, broker, rs1Port); return broker; } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -904,16 +904,6 @@ }; } private <T> Set<T> newSet(T... elems) { return new HashSet<T>(Arrays.asList(elems)); } private <T> List<T> newList(T... elems) { return Arrays.asList(elems); } /** * Test TopologyMsg encoding and decoding. */ opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -300,8 +300,7 @@ (byte)groupId, assured, assuredMode, (byte)safeDataLevel, assuredTimeout, scenario, serverState); List<String> replicationServers = new ArrayList<String>(); replicationServers.add("localhost:" + rsPort); Set<String> replicationServers = newSet("localhost:" + rsPort); fakeReplicationDomain.startPublishService(replicationServers, window, 1000, 500); if (startListen) fakeReplicationDomain.startListenService(); @@ -309,8 +308,7 @@ // Test connection assertTrue(fakeReplicationDomain.isConnected()); // Check connected server port HostPort rd = HostPort.valueOf(fakeReplicationDomain.getReplicationServer()); HostPort rd = HostPort.valueOf(fakeReplicationDomain.getReplicationServer()); assertEquals(rd.getPort(), rsPort); return fakeReplicationDomain; @@ -1253,17 +1251,9 @@ // Fake RS 3 scenario objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_RS_SCENARIO, TIMEOUT_RS_SCENARIO); Object[][] result = new Object[objectArrayList.size()][]; int i = 0; for (List<Object> objectArray : objectArrayList) { result[i] = objectArray.toArray(); i++; } debugInfo("testSafeDataLevelHighProvider: number of possible parameter combinations : " + i); return result; debugInfo("testSafeDataLevelHighProvider: number of possible parameter combinations : " + objectArrayList.size()); return toDataProvider(objectArrayList); } /** @@ -1862,11 +1852,16 @@ // Fake RS sends update in assured mode objectArrayList = addPossibleParameters(objectArrayList, true, false); Object[][] result = new Object[objectArrayList.size()][]; int i = 0; for (List<Object> objectArray : objectArrayList) return toDataProvider(objectArrayList); } private Object[][] toDataProvider(List<List<Object>> listOfList) { result[i] = objectArray.toArray(); Object[][] result = new Object[listOfList.size()][]; int i = 0; for (List<Object> list : listOfList) { result[i] = list.toArray(); i++; } return result; @@ -2292,14 +2287,7 @@ // Other additional RS scenario objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_RS_SCENARIO, TIMEOUT_RS_SCENARIO, DS_TIMEOUT_RS_SCENARIO_SAFE_READ, DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ, DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ); Object[][] result = new Object[objectArrayList.size()][]; int i = 0; for (List<Object> objectArray : objectArrayList) { result[i] = objectArray.toArray(); i++; } return result; return toDataProvider(objectArrayList); } /** opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -616,7 +616,7 @@ // Add the root entry in the backend backend2 = initializeTestBackend(false, backendId2); backend2.setPrivateBackend(true); SortedSet<String> replServers = newSet("localhost:" + replicationServerPort); SortedSet<String> replServers = newSortedSet("localhost:" + replicationServerPort); DomainFakeCfg domainConf = new DomainFakeCfg(baseDN2, 1602, replServers); domain2 = startNewDomain(domainConf, null,null); @@ -2870,10 +2870,10 @@ // Add the root entry in the backend backend2 = initializeTestBackend(false, TEST_BACKEND_ID2); SortedSet<String> replServers = newSet("localhost:" + replicationServerPort); SortedSet<String> replServers = newSortedSet("localhost:" + replicationServerPort); // on o=test2,sid=1702 include attrs set to : 'sn' SortedSet<String> eclInclude = newSet("sn", "roomnumber"); SortedSet<String> eclInclude = newSortedSet("sn", "roomnumber"); DomainFakeCfg domainConf = new DomainFakeCfg(TEST_ROOT_DN2, 1702, replServers); domain2 = startNewDomain(domainConf, eclInclude, eclInclude); @@ -2881,15 +2881,15 @@ backend3 = initializeTestBackend(false, backendId3); // on o=test3,sid=1703 include attrs set to : 'objectclass' eclInclude = newSet("objectclass"); eclInclude = newSortedSet("objectclass"); SortedSet<String> eclIncludeForDeletes = newSet("*"); SortedSet<String> eclIncludeForDeletes = newSortedSet("*"); domainConf = new DomainFakeCfg(baseDN3, 1703, replServers); domain3 = startNewDomain(domainConf, eclInclude, eclIncludeForDeletes); // on o=test2,sid=1704 include attrs set to : 'cn' eclInclude = newSet("cn"); eclInclude = newSortedSet("cn"); domainConf = new DomainFakeCfg(TEST_ROOT_DN2, 1704, replServers); domain21 = startNewDomain(domainConf, eclInclude, eclInclude); @@ -3021,11 +3021,6 @@ } } private static SortedSet<String> newSet(String... values) { return new TreeSet<String>(Arrays.asList(values)); } private LDAPReplicationDomain startNewDomain(DomainFakeCfg domainConf, SortedSet<String> eclInclude, SortedSet<String> eclIncludeForDeletes) throws Exception opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -30,7 +30,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Collection; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -68,12 +68,8 @@ private long generationID = 1; public FakeReplicationDomain( DN baseDN, int serverID, Collection<String> replicationServers, int window, long heartbeatInterval, public FakeReplicationDomain(DN baseDN, int serverID, Set<String> replicationServers, int window, long heartbeatInterval, BlockingQueue<UpdateMsg> queue) throws ConfigException { super(baseDN, serverID, 100); @@ -82,15 +78,10 @@ this.queue = queue; } public FakeReplicationDomain( DN baseDN, int serverID, Collection<String> replicationServers, int window, long heartbeatInterval, String exportString, StringBuilder importString, int exportedEntryCount) throws ConfigException public FakeReplicationDomain(DN baseDN, int serverID, Set<String> replicationServers, int window, long heartbeatInterval, String exportString, StringBuilder importString, int exportedEntryCount) throws ConfigException { super(baseDN, serverID, 100); startPublishService(replicationServers, window, heartbeatInterval, 500); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -27,12 +27,10 @@ */ package org.opends.server.replication.service; import static org.opends.messages.ReplicationMessages.*; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Collection; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -42,6 +40,8 @@ import org.opends.server.types.DirectoryException; import org.opends.server.types.ResultCode; import static org.opends.messages.ReplicationMessages.*; /** * This class is the minimum implementation of a Concrete ReplicationDomain * used to test the Generic Replication Service. @@ -55,12 +55,8 @@ */ private BlockingQueue<UpdateMsg> queue = null; public FakeStressReplicationDomain( DN baseDN, int serverID, Collection<String> replicationServers, int window, long heartbeatInterval, public FakeStressReplicationDomain(DN baseDN, int serverID, Set<String> replicationServers, int window, long heartbeatInterval, BlockingQueue<UpdateMsg> queue) throws ConfigException { super(baseDN, serverID, 100); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -45,6 +45,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.opends.server.TestCaseUtils.*; import static org.testng.Assert.*; /** @@ -92,16 +93,13 @@ replServer2 = createReplicationServer(replServerID2, replServerPort2, "ReplicationDomainTestDb2", 100, "localhost:" + replServerPort1); List<String> servers = new ArrayList<String>(1); servers.add("localhost:" + replServerPort1); Set<String> servers = newSet("localhost:" + replServerPort1); BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>(); domain1 = new FakeReplicationDomain( testService, domain1ServerId, servers, 100, 1000, rcvQueue1); List<String> servers2 = new ArrayList<String>(1); servers2.add("localhost:" + replServerPort2); Set<String> servers2 = newSet("localhost:" + replServerPort2); BlockingQueue<UpdateMsg> rcvQueue2 = new LinkedBlockingQueue<UpdateMsg>(); domain2 = new FakeReplicationDomain( testService, domain2ServerId, servers2, 100, 1000, rcvQueue2); @@ -218,9 +216,7 @@ replServer1 = createReplicationServer(replServerID1, replServerPort, "ReplicationDomainTestDb", 100000, "localhost:" + replServerPort); List<String> servers = new ArrayList<String>(1); servers.add("localhost:" + replServerPort); Set<String> servers = newSet("localhost:" + replServerPort); BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>(); domain1 = new FakeReplicationDomain( testService, domain1ServerId, servers, 1000, 100000, rcvQueue1); @@ -321,8 +317,7 @@ replServer = createReplicationServer(replServerID, replServerPort, "exportAndImportData", 100); List<String> servers = new ArrayList<String>(1); servers.add("localhost:" + replServerPort); Set<String> servers = newSet("localhost:" + replServerPort); StringBuilder exportedDataBuilder = new StringBuilder(); for (int i =0; i<ENTRYCOUNT; i++) @@ -399,11 +394,8 @@ replServer2 = createReplicationServer(replServerID2, replServerPort2, "exportAndImportservice2", 100, "localhost:" + replServerPort1); List<String> servers1 = new ArrayList<String>(1); servers1.add("localhost:" + replServerPort1); List<String> servers2 = new ArrayList<String>(1); servers2.add("localhost:" + replServerPort2); Set<String> servers1 = newSet("localhost:" + replServerPort1); Set<String> servers2 = newSet("localhost:" + replServerPort2); StringBuilder exportedDataBuilder = new StringBuilder(); for (int i =0; i<ENTRYCOUNT; i++)