opends/resource/schema/02-config.ldif
@@ -2458,6 +2458,11 @@ SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.605 NAME 'ds-cfg-monitoring-period' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.1 NAME 'ds-cfg-access-control-handler' SUP top @@ -3137,7 +3142,8 @@ ds-cfg-group-id $ ds-cfg-assured-timeout $ ds-cfg-degraded-status-threshold $ ds-cfg-weight) ds-cfg-weight $ ds-cfg-monitoring-period) X-ORIGIN 'OpenDS Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.65 NAME 'ds-backup-directory' opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
@@ -296,4 +296,27 @@ </ldap:attribute> </adm:profile> </adm:property> <adm:property name="monitoring-period" mandatory="false"> <adm:synopsis> The period between sending of monitoring messages. </adm:synopsis> <adm:description> Defines the amount of milliseconds the replication server will wait before sending new monitoring messages to its peers (replication servers and directory servers). </adm:description> <adm:default-behavior> <adm:defined> <adm:value>3000ms</adm:value> </adm:defined> </adm:default-behavior> <adm:syntax> <adm:duration base-unit="ms" lower-limit="1000" /> </adm:syntax> <adm:profile name="ldap"> <ldap:attribute> <ldap:name>ds-cfg-monitoring-period</ldap:name> </ldap:attribute> </adm:profile> </adm:property> </adm:managed-object> opends/src/messages/messages/replication.properties
@@ -170,7 +170,7 @@ UTF-8. This is required to be able to encode the changes in the database. \ This replication server will now shutdown SEVERE_ERR_REPLICATION_COULD_NOT_CONNECT_61=The Replication is configured for \ suffix %s but was not able to connect to any Replication Server suffix %s but was not able to connect to any Replication Server NOTICE_NOW_FOUND_SAME_GENERATION_CHANGELOG_62=Replication is up and running \ for domain %s with replication server id %s %s - local server id is %s - data \ generation is %s opends/src/server/org/opends/server/replication/common/DSInfo.java
@@ -247,23 +247,23 @@ StringBuffer sb = new StringBuffer(); sb.append("DS id: "); sb.append(dsId); sb.append(" RS id: "); sb.append(" ; RS id: "); sb.append(rsId); sb.append(" Generation id: "); sb.append(" ; Generation id: "); sb.append(generationId); sb.append(" Status: "); sb.append(" ; Status: "); sb.append(status); sb.append(" Assured replication: "); sb.append(" ; Assured replication: "); sb.append(assuredFlag); sb.append(" Assured mode: "); sb.append(" ; Assured mode: "); sb.append(assuredMode); sb.append(" Safe data level: "); sb.append(" ; Safe data level: "); sb.append(safeDataLevel); sb.append(" Group id: "); sb.append(" ; Group id: "); sb.append(groupId); sb.append(" Referral URLs: "); sb.append(" ; Referral URLs: "); sb.append(refUrls); sb.append(" ECL Include: "); sb.append(" ; ECL Include: "); sb.append(eclIncludes); return sb.toString(); } opends/src/server/org/opends/server/replication/common/RSInfo.java
@@ -40,6 +40,11 @@ private long generationId = -1; // Group id of the RS private byte groupId = (byte) -1; // The weight of the RS // It is important to keep the default value to 1 so that it is used as // default value for a RS using protocol V3: this default value vill be used // in algorithms that use weight private int weight = 1; /** * Creates a new instance of RSInfo with every given info. @@ -47,12 +52,14 @@ * @param id The RS id * @param generationId The generation id the RS is using * @param groupId RS group id * @param weight RS weight */ public RSInfo(int id, long generationId, byte groupId) public RSInfo(int id, long generationId, byte groupId, int weight) { this.id = id; this.generationId = generationId; this.groupId = groupId; this.weight = weight; } /** @@ -83,6 +90,16 @@ } /** * Get the RS weight. * @return The RS weight */ public int getWeight() { return weight; } /** * Test if the passed object is equal to this one. * @param obj The object to test * @return True if both objects are equal @@ -99,7 +116,8 @@ RSInfo rsInfo = (RSInfo) obj; return ((id == rsInfo.getId()) && (generationId == rsInfo.getGenerationId()) && (groupId == rsInfo.getGroupId())); (groupId == rsInfo.getGroupId()) && (weight == rsInfo.getWeight())); } else { return false; @@ -117,6 +135,7 @@ hash = 37 * hash + this.id; hash = 37 * hash + (int) (this.generationId ^ (this.generationId >>> 32)); hash = 37 * hash + this.groupId; hash = 37 * hash + this.weight; return hash; } @@ -130,11 +149,12 @@ StringBuffer sb = new StringBuffer(); sb.append("Id: "); sb.append(id); sb.append(" Generation id: "); sb.append(" ; Generation id: "); sb.append(generationId); sb.append(" Group id: "); sb.append(" ; Group id: "); sb.append(groupId); sb.append(" ; Weight: "); sb.append(weight); return sb.toString(); } } opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -93,6 +93,24 @@ } /** * Sets the sender ID. * @param senderID The sender ID. */ public void setSenderID(int senderID) { this.senderID = senderID; } /** * Sets the destination. * @param destination The destination. */ public void setDestination(int destination) { this.destination = destination; } /** * Sets the state of the replication server. * @param state The state. */ opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
@@ -31,7 +31,7 @@ /** * This message is part of the replication protocol. * RS1 sends a MonitorRequestMsg to RS2 to requests its monitoring * RS1 sends a MonitorRequestMsg to RS2 to request its monitoring * informations. * When RS2 receives a MonitorRequestMsg from RS1, RS2 responds with a * MonitorMessage. opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java
@@ -29,7 +29,7 @@ /** * This is an abstract class of messages of the replication protocol * for message that needs to contain information about the server that * send them and the destination servers to whitch they should be sent. * send them and the destination servers to which they should be sent. */ public abstract class RoutableMsg extends ReplicationMsg { opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -167,6 +167,7 @@ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { // Put ECL includes Set<String> attrs = dsInfo.getEclIncludes(); oStream.write(attrs.size()); for (String attr : attrs) @@ -192,8 +193,15 @@ oStream.write(String.valueOf(rsInfo.getGenerationId()). getBytes("UTF-8")); oStream.write(0); // Put DS group id // Put RS group id oStream.write(rsInfo.getGroupId()); if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { // Put RS weight oStream.write(String.valueOf(rsInfo.getWeight()).getBytes("UTF-8")); oStream.write(0); } } return oStream.toByteArray(); @@ -332,23 +340,30 @@ int length = getNextLength(in, pos); String serverIdString = new String(in, pos, length, "UTF-8"); int id = Integer.valueOf(serverIdString); pos += length + 1; pos += length + 1; /* Read the generation id */ length = getNextLength(in, pos); long generationId = Long.valueOf(new String(in, pos, length, "UTF-8")); pos += length + 1; pos += length + 1; /* Read RS group id */ byte groupId = in[pos++]; int weight = 1; if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { /* Read RS weight */ length = getNextLength(in, pos); weight = Integer.valueOf(new String(in, pos, length, "UTF-8")); pos += length + 1; } /* Now create RSInfo and store it in list */ RSInfo rsInfo = new RSInfo(id, generationId, groupId); RSInfo rsInfo = new RSInfo(id, generationId, groupId, weight); rsList.add(rsInfo); nRsInfo--; opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -309,8 +309,7 @@ try { MonitorData md; md = replicationServerDomain.computeMonitorData(); MonitorData md = replicationServerDomain.computeMonitorData(); // Oldest missing update Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId); @@ -538,7 +537,7 @@ return; } // Send our own TopologyMsg to remote RS // Send our own TopologyMsg to remote DS TopologyMsg outTopoMsg = sendTopoToRemoteDS(); logStartSessionHandshake(inStartSessionMsg, outTopoMsg); @@ -572,6 +571,9 @@ // Create the status analyzer for the domain if not already started createStatusAnalyzer(); // Create the monitoring publisher for the domain if not already started createMonitoringPublisher(); registerIntoDomain(); super.finalizeStart(); opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -49,8 +49,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.opends.messages.Category; import org.opends.messages.Message; @@ -101,6 +99,7 @@ import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; import com.sleepycat.je.DatabaseException; import java.util.Collections; /** * ReplicationServer Listener. @@ -173,6 +172,10 @@ // the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled private int degradedStatusThreshold = 5000; // Number of milliseconds to wait before sending new monitoring messages. // If value is 0, monitoring publisher is disabled private long monitoringPublisherPeriod = 3000; // The handler of the draft change numbers database, the database used to // store the relation between a draft change number ('seqnum') and the // associated cookie. @@ -211,6 +214,13 @@ private int weight = 1; /** * Holds the list of all replication servers instantiated in this VM. * This allows to perform clean up of the RS databases in unit tests. */ private static List<ReplicationServer> allInstances = new ArrayList<ReplicationServer>(); /** * Creates a new Replication server using the provided configuration entry. * * @param configuration The configuration of this replication server. @@ -254,6 +264,7 @@ groupId = (byte)configuration.getGroupId(); assuredTimeout = configuration.getAssuredTimeout(); degradedStatusThreshold = configuration.getDegradedStatusThreshold(); monitoringPublisherPeriod = configuration.getMonitoringPeriod(); replSessionSecurity = new ReplSessionSecurity(); initialize(replicationPort); @@ -274,8 +285,20 @@ DirectoryServer.registerImportTaskListener(this); localPorts.add(replicationPort); // Keep track of this new instance allInstances.add(this); } /** * Get the list of every replication servers instantiated in the current VM. * @return The list of every replication servers instantiated in the current * VM. */ public static List<ReplicationServer> getAllInstances() { return allInstances; } /** * The run method for the Listen thread. @@ -850,7 +873,9 @@ dbEnv.shutdown(); } } // Remove this instance from the global instance list allInstances.remove(this); } /** @@ -1028,6 +1053,32 @@ } } // Update period value for monitoring publishers (stop them if requested // value is 0) if (monitoringPublisherPeriod != configuration.getMonitoringPeriod()) { long oldMonitoringPeriod = monitoringPublisherPeriod; monitoringPublisherPeriod = configuration.getMonitoringPeriod(); for(ReplicationServerDomain rsd : baseDNs.values()) { if (monitoringPublisherPeriod == 0L) { // Requested to stop monitoring publishers rsd.stopMonitoringPublisher(); } else if (rsd.isRunningMonitoringPublisher()) { // Update the threshold value for this running monitoring publisher rsd.updateMonitoringPublisher(monitoringPublisherPeriod); } else if (oldMonitoringPeriod == 0L) { // Requested to start monitoring publishers with provided period value if ( (rsd.getConnectedDSs().size() > 0) || (rsd.getConnectedRSs().size() > 0) ) rsd.startMonitoringPublisher(); } } } // Changed the group id ? byte newGroupId = (byte)configuration.getGroupId(); if (newGroupId != groupId) @@ -1044,7 +1095,10 @@ if (weight != configuration.getWeight()) { weight = configuration.getWeight(); // TODO: send new TopologyMsg // Broadcast the new weight the the whole topology. This will make some // DSs reconnect (if needed) to other RSs according to the new weight of // this RS. broadcastConfigChange(); } if ((configuration.getReplicationDBDirectory() != null) && @@ -1057,6 +1111,19 @@ } /** * Broadcast a configuration change that just happened to the whole topology * by sending a TopologyMsg to every entity in the topology. */ private void broadcastConfigChange() { for (ReplicationServerDomain replicationServerDomain : baseDNs.values()) { replicationServerDomain.buildAndSendTopoInfoToDSs(null); replicationServerDomain.buildAndSendTopoInfoToRSs(); } } /** * {@inheritDoc} */ public boolean isConfigurationChangeAcceptable( @@ -1345,6 +1412,15 @@ } /** * Get the monitoring publisher period value. * @return the monitoring publisher period value. */ public long getMonitoringPublisherPeriod() { return monitoringPublisherPeriod; } /** * Compute the list of replication servers that are not any * more connected to this Replication Server and stop the * corresponding handlers. @@ -1411,12 +1487,80 @@ /* The date of the last time they have been elaborated */ private long monitorDataLastBuildDate = 0; /* Search op on monitor data is processed by a worker thread. * Requests are sent to the other RS,and responses are received by the * listener threads. * The worker thread is awoke on this semaphore, or on timeout. /** * This uniquely identifies a server (handler) in the cross-domain topology. * Represents an identifier of a handler (in the whole RS) we have to wait a * monitoring message from before answering to a monitor request. */ Semaphore remoteMonitorResponsesSemaphore = new Semaphore(0); public static class GlobalServerId { private int serverId = -1; private String baseDn = null; /** * Constructor for a global server id. * @param baseDn The dn of the RSD owning the handler. * @param serverId The handler id in the matching RSD. */ public GlobalServerId(String baseDn, int serverId) { this.baseDn = baseDn; this.serverId = serverId; } /** * Get the server handler id. * @return the serverId */ public int getServerId() { return serverId; } /** * Get the base dn. * @return the baseDn */ public String getBaseDn() { return baseDn; } /** * Get the hascode. * @return The hashcode. */ @Override public int hashCode() { int hash = 7; hash = 43 * hash + this.serverId; hash = 43 * hash + (this.baseDn != null ? this.baseDn.hashCode() : 0); return hash; } /** * Tests if the passed global server handler id represents the same server * handler as this one. * @param obj The object to test. * @return True if both identifiers are the same. */ public boolean equals(Object obj) { if ( (obj == null) || (obj instanceof GlobalServerId)) return false; GlobalServerId globalServerId = (GlobalServerId)obj; return ( globalServerId.baseDn.equals(baseDn) && (globalServerId.serverId == serverId) ); } } /** * This gives the list of server handlers we are willing to wait monitoring * message from. Each time a monitoring message is received by a server * handler, the matching server handler id is retired from the list. When the * list is empty, we received all expected monitoring messages. */ private List<GlobalServerId> expectedMonitoringMsg = null; /** * Trigger the computation of the Global Monitoring Data. @@ -1429,7 +1573,7 @@ * * @throws DirectoryException If the computation cannot be achieved. */ public void computeMonitorData() throws DirectoryException public synchronized void computeMonitorData() throws DirectoryException { if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime()) { @@ -1440,15 +1584,17 @@ return; } remoteMonitorResponsesSemaphore.drainPermits(); int count = 0; // Initialize the list of server handlers we expect monitoring messages from expectedMonitoringMsg = Collections.synchronizedList(new ArrayList<GlobalServerId>()); for (ReplicationServerDomain domain : baseDNs.values()) { count += domain.initializeMonitorData(); domain.initializeMonitorData(expectedMonitoringMsg); } // Wait for responses waitMonitorDataResponses(count); waitMonitorDataResponses(); for (ReplicationServerDomain domain : baseDNs.values()) { @@ -1457,38 +1603,51 @@ } /** * Wait for the expected count of received MonitorMsg. * @param expectedResponses The number of expected answers. * Wait for the expected received MonitorMsg. * @throws DirectoryException When an error occurs. */ private void waitMonitorDataResponses(int expectedResponses) private void waitMonitorDataResponses() throws DirectoryException { try { if (debugEnabled()) TRACER.debugInfo( "In " + getMonitorInstanceName() + " baseDn=" + " waiting for " + expectedResponses + " expected monitor messages"); "In " + getMonitorInstanceName() + " waiting for " + expectedMonitoringMsg.size() + " expected monitor messages"); boolean allPermitsAcquired = remoteMonitorResponsesSemaphore.tryAcquire( expectedResponses, (long) 5000, TimeUnit.MILLISECONDS); if (!allPermitsAcquired) // Wait up to 5 seconds for every expected monitoring message to come // back. boolean allReceived = false; long startTime = TimeThread.getTime(); long curTime = startTime; int maxTime = 5000; while ( (curTime - startTime) < maxTime ) { monitorDataLastBuildDate = TimeThread.getTime(); // Have every expected monitoring messages arrived ? if (expectedMonitoringMsg.size() == 0) { // Ok break the loop allReceived = true; break; } Thread.sleep(100); curTime = TimeThread.getTime(); } monitorDataLastBuildDate = TimeThread.getTime(); if (!allReceived) { logError(ERR_MISSING_REMOTE_MONITOR_DATA.get()); // let's go on in best effort even with limited data received. // let's go on in best effort even with limited data received. } else { monitorDataLastBuildDate = TimeThread.getTime(); if (debugEnabled()) TRACER.debugInfo( "In " + getMonitorInstanceName() + " baseDn=" + " Successfully received all " + expectedResponses + " expected monitor messages"); "In " + getMonitorInstanceName() + " Successfully received all expected monitor messages"); } } catch (Exception e) { @@ -1499,11 +1658,18 @@ /** * This should be called by each ReplicationServerDomain that receives * a response to a monitor request message. * a response to a monitor request message. This may also be called when a * monitoring message is coming from a RS whose monitoring publisher thread * sent it. As monitoring messages (sent because of monitoring request or * because of monitoring publisher) have the same content, this is also ok * to mark ok the server when the monitoring message coms from a monitoring * publisher thread. * @param globalServerId The server handler that is receiving the * monitoring message. */ public void responseReceived() public void responseReceived(GlobalServerId globalServerId) { remoteMonitorResponsesSemaphore.release(); expectedMonitoringMsg.remove(globalServerId); } @@ -1513,7 +1679,7 @@ */ public void responseReceivedAll() { remoteMonitorResponsesSemaphore.notifyAll(); expectedMonitoringMsg.clear(); } /** opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -85,6 +85,8 @@ import org.opends.server.types.ResultCode; import com.sleepycat.je.DatabaseException; import org.opends.server.replication.server. ReplicationServer.GlobalServerId; /** * This class define an in-memory cache that will be used to store @@ -109,6 +111,10 @@ // late or not private StatusAnalyzer statusAnalyzer = null; // The monitoring publisher that periodically sends monitoring messages to the // topology private MonitoringPublisher monitoringPublisher = null; /* * The following map contains one balanced tree for each replica ID * to which we are currently publishing @@ -1066,6 +1072,17 @@ // Try doing job anyway... } // Stop useless monitoring publisher if no more RS or DS in domain if ( (directoryServers.size() + replicationServers.size() )== 1) { if (debugEnabled()) TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + " remote server " + handler.getMonitorInstanceName() + " is " + "the last RS/DS to be stopped: stopping monitoring publisher"); stopMonitoringPublisher(); } if (handler.isReplicationServer()) { if (replicationServers.containsValue(handler)) @@ -1082,44 +1099,39 @@ buildAndSendTopoInfoToDSs(null); } } } else } else if (directoryServers.containsValue(handler)) { if (directoryServers.containsValue(handler)) // If this is the last DS for the domain, // shutdown the status analyzer if (directoryServers.size() == 1) { // If this is the last DS for the domain, // shutdown the status analyzer if (directoryServers.size() == 1) { if (debugEnabled()) TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + " remote server " + handler.getMonitorInstanceName() + if (debugEnabled()) TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + " remote server " + handler.getMonitorInstanceName() + " is the last DS to be stopped: stopping status analyzer"); stopStatusAnalyzer(); } stopStatusAnalyzer(); } unregisterServerHandler(handler); handler.shutdown(); unregisterServerHandler(handler); handler.shutdown(); // Check if generation id has to be reset mayResetGenerationId(); // Check if generation id has to be reset mayResetGenerationId(); if (!shutdown) { // Update the remote replication servers with our list // of connected LDAP servers if (!shutdown) { buildAndSendTopoInfoToRSs(); // Warn our DSs that a RS or DS has quit (does not use this // handler as already removed from list) buildAndSendTopoInfoToDSs(null); } buildAndSendTopoInfoToRSs(); // Warn our DSs that a RS or DS has quit (does not use this // handler as already removed from list) buildAndSendTopoInfoToDSs(null); } else if (otherHandlers.contains(handler)) { unRegisterHandler(handler); handler.shutdown(); } } else if (otherHandlers.contains(handler)) { unRegisterHandler(handler); handler.shutdown(); } } catch(Exception e) { @@ -1581,99 +1593,51 @@ // in the topology. if (senderHandler.isDataServer()) { MonitorMsg returnMsg = new MonitorMsg(msg.getDestination(), msg.getsenderID()); // Monitoring information requested by a DS MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(msg.getDestination(), msg.getsenderID()); try if (monitorMsg != null) { returnMsg.setReplServerDbState(getDbServerState()); // Update the information we have about all servers // in the topology. MonitorData md = computeMonitorData(); // Add the informations about the Replicas currently in // the topology. Iterator<Integer> it = md.ldapIterator(); while (it.hasNext()) try { int replicaId = it.next(); returnMsg.setServerState( replicaId, md.getLDAPServerState(replicaId), md.getApproxFirstMissingDate(replicaId), true); } // Add the informations about the Replication Servers // currently in the topology. it = md.rsIterator(); while (it.hasNext()) senderHandler.send(monitorMsg); } catch (IOException e) { int replicaId = it.next(); returnMsg.setServerState( replicaId, md.getRSStates(replicaId), md.getRSApproxFirstMissingDate(replicaId), false); // the connection was closed. } } catch (DirectoryException e) { // If we can't compute the Monitor Information, send // back an empty message. } try { senderHandler.send(returnMsg); } catch (IOException e) { // the connection was closed. } return; } MonitorMsg monitorMsg = new MonitorMsg(msg.getDestination(), msg.getsenderID()); // Populate for each connected LDAP Server // from the states stored in the serverHandler. // - the server state // - the older missing change for (DataServerHandler lsh : this.directoryServers.values()) } else { monitorMsg.setServerState( lsh.getServerId(), lsh.getServerState(), lsh.getApproxFirstMissingDate(), true); } // Monitoring information requested by a RS MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(msg.getDestination(), msg.getsenderID()); // Same for the connected RS for (ReplicationServerHandler rsh : this.replicationServers.values()) { monitorMsg.setServerState( rsh.getServerId(), rsh.getServerState(), rsh.getApproxFirstMissingDate(), false); } // Populate the RS state in the msg from the DbState monitorMsg.setReplServerDbState(this.getDbServerState()); try { senderHandler.send(monitorMsg); } catch (Exception e) { // We log the error. The requestor will detect a timeout or // any other failure on the connection. logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get( Integer.toString((msg.getDestination())))); if (monitorMsg != null) { try { senderHandler.send(monitorMsg); } catch (Exception e) { // We log the error. The requestor will detect a timeout or // any other failure on the connection. logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get( Integer.toString((msg.getDestination())))); } } } } else if (msg instanceof MonitorMsg) { MonitorMsg monitorMsg = (MonitorMsg) msg; receivesMonitorDataResponse(monitorMsg); GlobalServerId globalServerId = new GlobalServerId(baseDn, senderHandler.getServerId()); receivesMonitorDataResponse(monitorMsg, globalServerId); } else { logError(NOTE_ERR_ROUTING_TO_SERVER.get( @@ -1775,6 +1739,116 @@ } /** * Creates a new monitor message including monitoring information for the * whole topology. * @param sender The sender of this message. * @param destination The destination of this message. * @return The newly created and filled MonitorMsg. Null if a problem occurred * during message creation. */ public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination) { MonitorMsg returnMsg = new MonitorMsg(sender, destination); try { returnMsg.setReplServerDbState(getDbServerState()); // Update the information we have about all servers // in the topology. MonitorData md = computeMonitorData(); // Add the informations about the Replicas currently in // the topology. Iterator<Integer> it = md.ldapIterator(); while (it.hasNext()) { int replicaId = it.next(); returnMsg.setServerState( replicaId, md.getLDAPServerState(replicaId), md.getApproxFirstMissingDate(replicaId), true); } // Add the informations about the Replication Servers // currently in the topology. it = md.rsIterator(); while (it.hasNext()) { int replicaId = it.next(); returnMsg.setServerState( replicaId, md.getRSStates(replicaId), md.getRSApproxFirstMissingDate(replicaId), false); } } catch (DirectoryException e) { // If we can't compute the Monitor Information, send // back an empty message. } return returnMsg; } /** * Creates a new monitor message including monitoring information for the * topology directly connected to this RS. This includes information for: * - local RS * - all direct DSs * - all direct RSs * @param sender The sender of this message. * @param destination The destination of this message. * @return The newly created and filled MonitorMsg. Null if a problem occurred * during message creation. */ public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination) { MonitorMsg monitorMsg = null; try { // Lock domain as we need to go through connected servers list lock(); monitorMsg = new MonitorMsg(sender, destination); // Populate for each connected LDAP Server // from the states stored in the serverHandler. // - the server state // - the older missing change for (DataServerHandler lsh : this.directoryServers.values()) { monitorMsg.setServerState( lsh.getServerId(), lsh.getServerState(), lsh.getApproxFirstMissingDate(), true); } // Same for the connected RS for (ReplicationServerHandler rsh : this.replicationServers.values()) { monitorMsg.setServerState( rsh.getServerId(), rsh.getServerState(), rsh.getApproxFirstMissingDate(), false); } // Populate the RS state in the msg from the DbState monitorMsg.setReplServerDbState(this.getDbServerState()); } catch(InterruptedException e) { // At lock, too bad... } finally { if (hasLock()) release(); } return monitorMsg; } /** * Shutdown this ReplicationServerDomain. */ public void shutdown() @@ -1831,8 +1905,7 @@ /** * Send a TopologyMsg to all the connected directory servers in order to * let. * them know the topology (every known DSs and RSs) * let them know the topology (every known DSs and RSs). * @param notThisOne If not null, the topology message will not be sent to * this passed server. */ @@ -1931,10 +2004,11 @@ dsInfos.add(serverHandler.toDSInfo()); } // Create info for us (local RS) // Create info for the local RS List<RSInfo> rsInfos = new ArrayList<RSInfo>(); RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(), generationId, replicationServer.getGroupId()); generationId, replicationServer.getGroupId(), replicationServer.getWeight()); rsInfos.add(localRSInfo); return new TopologyMsg(dsInfos, rsInfos); @@ -1965,7 +2039,8 @@ // Add our own info (local RS) RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(), generationId, replicationServer.getGroupId()); generationId, replicationServer.getGroupId(), replicationServer.getWeight()); rsInfos.add(localRSInfo); // Go through every peer RSs (and get their connected DSs), also add info @@ -2471,13 +2546,15 @@ * Start collecting global monitoring information for this * ReplicationServerDomain. * * @return The number of response that should come back. * @param expectedMonitoringMsg The list of server handler we have to wait a * monitoring message from. Will be filled as necessary by this method. * * @throws DirectoryException In case the monitoring information could * not be collected. */ int initializeMonitorData() throws DirectoryException void initializeMonitorData(List<GlobalServerId> expectedMonitoringMsg) throws DirectoryException { synchronized (monitorDataLock) { @@ -2539,7 +2616,7 @@ } // Send the request for remote monitor data to the return sendMonitorDataRequest(); sendMonitorDataRequest(expectedMonitoringMsg); } /** @@ -2566,22 +2643,25 @@ /** * Sends a MonitorRequest message to all connected RS. * @return the number of requests sent. * @param expectedMonitoringMsg The list of server handler we have to wait a * monitoring message from. Will be filled as necessary by this method. * @throws DirectoryException when a problem occurs. */ protected int sendMonitorDataRequest() protected void sendMonitorDataRequest( List<GlobalServerId> expectedMonitoringMsg) throws DirectoryException { int sent = 0; try { for (ServerHandler rs : replicationServers.values()) { int serverId = rs.getServerId(); MonitorRequestMsg msg = new MonitorRequestMsg(this.replicationServer.getServerId(), rs.getServerId()); serverId); rs.send(msg); sent++; // Store the fact that we expect a MonitoringMsg back from this server expectedMonitoringMsg.add(new GlobalServerId(baseDn, serverId)); } } catch (Exception e) { @@ -2590,7 +2670,6 @@ throw new DirectoryException(ResultCode.OTHER, message, e); } return sent; } /** @@ -2598,8 +2677,10 @@ * and stores the data received. * * @param msg The message to be processed. * @param globalServerHandlerId server handler that is receiving the message. */ public void receivesMonitorDataResponse(MonitorMsg msg) private void receivesMonitorDataResponse(MonitorMsg msg, GlobalServerId globalServerId) { try { @@ -2677,7 +2758,7 @@ // Decreases the number of expected responses and potentially // wakes up the waiting requestor thread. replicationServer.responseReceived(); replicationServer.responseReceived(globalServerId); } catch (Exception e) { @@ -2832,6 +2913,57 @@ } /** * Starts the monitoring publisher for the domain. */ public void startMonitoringPublisher() { if (monitoringPublisher == null) { long period = replicationServer.getMonitoringPublisherPeriod(); if (period > 0) // 0 means no monitoring publisher { monitoringPublisher = new MonitoringPublisher(this, period); monitoringPublisher.start(); } } } /** * Stops the monitoring publisher for the domain. */ public void stopMonitoringPublisher() { if (monitoringPublisher != null) { monitoringPublisher.shutdown(); monitoringPublisher.waitForShutdown(); monitoringPublisher = null; } } /** * Tests if the monitoring publisher for this domain is running. * @return True if the monitoring publisher is running, false otherwise. */ public boolean isRunningMonitoringPublisher() { return (monitoringPublisher != null); } /** * Update the monitoring publisher with the new period value. * @param period The new period value. */ public void updateMonitoringPublisher(long period) { if (monitoringPublisher != null) { monitoringPublisher.setPeriod(period); } } /** * {@inheritDoc} */ @Override opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -240,6 +240,9 @@ logTopoHandshakeSNDandRCV(outTopoMsg, inTopoMsg); // Create the monitoring publisher for the domain if not already started createMonitoringPublisher(); // FIXME: i think this should be done for all protocol version !! // not only those > V1 registerIntoDomain(); @@ -408,6 +411,10 @@ // other servers. } // Create the monitoring publisher for the domain if not already started createMonitoringPublisher(); registerIntoDomain(); // Process TopologyMsg sent by remote RS: store matching new info @@ -497,7 +504,18 @@ // Remote RS sent his topo msg TopologyMsg inTopoMsg = (TopologyMsg) msg; // CONNECTION WITH A RS // Store remore RS weight if it has one if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { // List should only contain RS info for sender RSInfo rsInfo = inTopoMsg.getRsList().get(0); weight = rsInfo.getWeight(); } else { // Remote RS uses protocol version prior to 4 : use default value for // weight: 1 } // if the remote RS and the local RS have the same genID // then it's ok and nothing else to do @@ -646,6 +664,7 @@ RSInfo rsInfo = rsInfos.get(0); generationId = rsInfo.getGenerationId(); groupId = rsInfo.getGroupId(); weight = rsInfo.getWeight(); /** * Store info for DSs connected to the peer RS opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -244,6 +244,10 @@ */ private AtomicBoolean shuttingDown = new AtomicBoolean(false); /** * Weight of this remote server. */ protected int weight = 1; /** * Creates a new server handler instance with the provided socket. @@ -1215,12 +1219,23 @@ */ public RSInfo toRSInfo() { RSInfo rsInfo = new RSInfo(serverId, generationId, groupId); RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, weight); return rsInfo; } /** * Starts the monitoring publisher for the domain if not already started. */ protected void createMonitoringPublisher() { if (!replicationServerDomain.isRunningMonitoringPublisher()) { replicationServerDomain.startMonitoringPublisher(); } } /** * Performs any processing periodic processing that may be desired to update * the information associated with this monitor. Note that best-effort * attempts will be made to ensure that calls to this method come opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -55,11 +55,14 @@ import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.DSInfo; import org.opends.server.replication.common.MutableBoolean; import org.opends.server.replication.common.RSInfo; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.protocol.ChangeStatusMsg; import org.opends.server.replication.protocol.HeartbeatMonitor; import org.opends.server.replication.protocol.MonitorMsg; import org.opends.server.replication.protocol.MonitorRequestMsg; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.ProtocolVersion; import org.opends.server.replication.protocol.ReplServerStartDSMsg; @@ -116,6 +119,28 @@ private ReplicationDomain domain = null; /** * This object is used as a conditional event to be notified about * the reception of monitor information from the Replication Server. */ private final MutableBoolean monitorResponse = new MutableBoolean(false); /** * A Map containing the ServerStates of all the replicas in the topology * as seen by the ReplicationServer the last time it was polled or the last * time it published monitoring information. */ private HashMap<Integer, ServerState> replicaStates = new HashMap<Integer, ServerState>(); /** * A Map containing the ServerStates of all the replication servers in the * topology as seen by the ReplicationServer the last time it was polled or * the last time it published monitoring information. */ private HashMap<Integer, ServerState> rsStates = new HashMap<Integer, ServerState>(); /** * The expected duration in milliseconds between heartbeats received * from the replication server. Zero means heartbeats are off. */ @@ -1918,6 +1943,37 @@ // Try to find a suitable RS this.reStart(failingSession); } else if (msg instanceof MonitorMsg) { // This is the response to a MonitorRequest that was sent earlier or // the regular message of the monitoring publisher of the RS. // Extract and store replicas ServerStates replicaStates = new HashMap<Integer, ServerState>(); MonitorMsg monitorMsg = (MonitorMsg) msg; Iterator<Integer> it = monitorMsg.ldapIterator(); while (it.hasNext()) { int srvId = it.next(); replicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId)); } // Notify the sender that the response was received. synchronized (monitorResponse) { monitorResponse.set(true); monitorResponse.notify(); } // Extract and store replication servers ServerStates rsStates = new HashMap<Integer, ServerState>(); it = monitorMsg.rsIterator(); while (it.hasNext()) { int srvId = it.next(); rsStates.put(srvId, monitorMsg.getRSServerState(srvId)); } } else { return msg; @@ -1949,6 +2005,40 @@ } /** * Gets the States of all the Replicas currently in the * Topology. * When this method is called, a Monitoring message will be sent * to the Replication Server to which this domain is currently connected * so that it computes a table containing information about * all Directory Servers in the topology. * This Computation involves communications will all the servers * currently connected and * * @return The States of all Replicas in the topology (except us) */ public Map<Integer, ServerState> getReplicaStates() { monitorResponse.set(false); // publish Monitor Request Message to the Replication Server publish(new MonitorRequestMsg(serverId, getRsServerId())); // wait for Response up to 10 seconds. try { synchronized (monitorResponse) { if (monitorResponse.get() == false) { monitorResponse.wait(10000); } } } catch (InterruptedException e) {} return replicaStates; } /** * This method allows to do the necessary computing for the window * management after treatment by the worker threads. * @@ -2440,7 +2530,7 @@ { ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread( "Replication CN Heartbeat Thread started for " + "Replication CN Heartbeat sender for " + baseDn + " with " + getReplicationServer(), session, changeTimeHeartbeatSendInterval, serverId); ctHeartbeatPublisherThread.start(); opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -65,7 +65,6 @@ import java.util.ArrayList; import java.util.Date; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; @@ -79,7 +78,6 @@ import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.AssuredMode; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.MutableBoolean; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.common.StatusMachine; @@ -92,8 +90,6 @@ import org.opends.server.replication.protocol.HeartbeatMsg; import org.opends.server.replication.protocol.InitializeRequestMsg; import org.opends.server.replication.protocol.InitializeTargetMsg; import org.opends.server.replication.protocol.MonitorMsg; import org.opends.server.replication.protocol.MonitorRequestMsg; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.ProtocolVersion; import org.opends.server.replication.protocol.ReplicationMsg; @@ -306,20 +302,6 @@ */ private final ChangeNumberGenerator generator; /** * This object is used as a conditional event to be notified about * the reception of monitor information from the Replication Server. */ private final MutableBoolean monitorResponse = new MutableBoolean(false); /** * A Map containing of the ServerStates of all the replicas in the topology * as seen by the ReplicationServer the last time it was polled. */ private HashMap<Integer, ServerState> replicaStates = new HashMap<Integer, ServerState>(); Set<String> cfgEclIncludes = new HashSet<String>(); Set<String> eClIncludes = new HashSet<String>(); @@ -586,24 +568,7 @@ */ public Map<Integer, ServerState> getReplicaStates() { monitorResponse.set(false); // publish Monitor Request Message to the Replication Server broker.publish(new MonitorRequestMsg(serverID, broker.getRsServerId())); // wait for Response up to 10 seconds. try { synchronized (monitorResponse) { if (monitorResponse.get() == false) { monitorResponse.wait(10000); } } } catch (InterruptedException e) {} return replicaStates; return broker.getReplicaStates(); } /** @@ -834,26 +799,6 @@ update = (UpdateMsg) msg; generator.adjust(update.getChangeNumber()); } else if (msg instanceof MonitorMsg) { // This is the response to a MonitorRequest that was sent earlier // build the replicaStates Map. replicaStates = new HashMap<Integer, ServerState>(); MonitorMsg monitorMsg = (MonitorMsg) msg; Iterator<Integer> it = monitorMsg.ldapIterator(); while (it.hasNext()) { int serverId = it.next(); replicaStates.put( serverId, monitorMsg.getLDAPServerState(serverId)); } // Notify the sender that the response was received. synchronized (monitorResponse) { monitorResponse.set(true); monitorResponse.notify(); } } } catch (SocketTimeoutException e) { opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -27,7 +27,6 @@ package org.opends.server.replication; import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.replication.protocol.OperationContext.SYNCHROCONTEXT; @@ -37,7 +36,6 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import static org.opends.server.loggers.ErrorLogger.logError; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; @@ -56,13 +54,9 @@ import java.util.SortedSet; import java.util.TreeSet; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.TestCaseUtils; import org.opends.server.api.Backend; import org.opends.server.api.ConnectionHandler; import org.opends.server.api.SynchronizationProvider; import org.opends.server.backends.MemoryBackend; import org.opends.server.config.ConfigException; import org.opends.server.controls.ExternalChangelogRequestControl; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -784,6 +784,8 @@ catch(SocketTimeoutException e) { // This is the expected result // Note that timeout should be lower than RS montoring publisher period // so that timeout occurs } //=========================================================== @@ -889,49 +891,21 @@ // Broker 2 and 3 should receive 1 change status message to order them // to enter the bad gen id status try ChangeStatusMsg csMsg = (ChangeStatusMsg)waitForSpecificMsg(broker2, ChangeStatusMsg.class.getName()); if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS) { ReplicationMsg msg = broker2.receive(); if (!(msg instanceof ChangeStatusMsg)) { fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" + " to enter the bad gen id status" + msg); } ChangeStatusMsg csMsg = (ChangeStatusMsg)msg; if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS) { fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" + " to enter the bad gen id status" + msg); } fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" + " to enter the bad gen id status" + csMsg); } catch(SocketTimeoutException se) csMsg = (ChangeStatusMsg)waitForSpecificMsg(broker3, ChangeStatusMsg.class.getName()); if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS) { fail("DS2 is expected to receive 1 ChangeStatusMsg to enter the " + "bad gen id status."); } try { ReplicationMsg msg = broker3.receive(); if (!(msg instanceof ChangeStatusMsg)) { fail("Broker 3 connection is expected to receive 1 ChangeStatusMsg" + " to enter the bad gen id status" + msg); } ChangeStatusMsg csMsg = (ChangeStatusMsg)msg; if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS) { fail("Broker 3 connection is expected to receive 1 ChangeStatusMsg" + " to enter the bad gen id status" + msg); } } catch(SocketTimeoutException se) { fail("DS3 is expected to receive 1 ChangeStatusMsg to enter the " + "bad gen id status."); fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" + " to enter the bad gen id status" + csMsg); } debugInfo("DS1 root entry must contain the new gen ID"); @@ -988,7 +962,8 @@ debugInfo("DS2 is publishing a change and RS1 must ignore this change, DS3 must not receive it."); broker2.publish(createAddMsg()); AddMsg emsg = (AddMsg)createAddMsg(); broker2.publish(emsg); // Updates count in RS1 must stay unchanged = to 1 Thread.sleep(500); @@ -1060,8 +1035,30 @@ isDegradedDueToGenerationId(server3ID), "Expecting that DS3 is not in bad gen id from RS1"); debugInfo("Verify that DS2 receives the add message stored in RS1 DB"); try { ReplicationMsg msg = broker2.receive(); assertTrue(msg instanceof AddMsg, "Excpected to receive an AddMsg but received: " + msg); } catch(SocketTimeoutException e) { fail("The msg stored in RS1 DB is expected to be received by DS2)"); } debugInfo("Verify that DS3 receives the add message stored in RS1 DB"); try { ReplicationMsg msg = broker3.receive(); assertTrue(msg instanceof AddMsg, "Excpected to receive an AddMsg but received: " + msg); } catch(SocketTimeoutException e) { fail("The msg stored in RS1 DB is expected to be received by DS3)"); } debugInfo("DS2 is publishing a change and RS1 must store this change, DS3 must receive it."); AddMsg emsg = (AddMsg)createAddMsg(); emsg = (AddMsg)createAddMsg(); broker2.publish(emsg); Thread.sleep(500); @@ -1105,7 +1102,7 @@ * The following test focus on: * - genId checking across multiple starting RS (replication servers) * - genId setting propagation from one RS to the others * - genId reset propagation from one RS to the others * - genId reset propagation from one RS to the others */ @Test(enabled=false) public void testMultiRS() throws Exception @@ -1190,7 +1187,7 @@ assertEquals(replServer2.getGenerationId(baseDn.toNormalizedString()), genId); assertEquals(replServer3.getGenerationId(baseDn.toNormalizedString()), genId); debugInfo("Connecting broker2 to replServer1 with a bad genId"); debugInfo("Connecting broker3 to replServer1 with a bad genId"); try { long badgenId = 1; @@ -1215,7 +1212,7 @@ debugInfo("Connecting DS to replServer1."); connectServer1ToChangelog(changelog1ID); Thread.sleep(1000); Thread.sleep(3000); debugInfo("Adding reset task to DS."); @@ -1373,7 +1370,7 @@ /** * Loop opening sessions to the Replication Server * to check that it handle correctly deconnection and reconnection. * to check that it handle correctly disconnection and reconnection. */ @Test(enabled=false, groups="slow") public void testLoop() throws Exception opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -47,7 +47,6 @@ import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.MessageBuilder; import org.opends.messages.Severity; import org.opends.server.DirectoryServerTestCase; import org.opends.server.TestCaseUtils; @@ -65,9 +64,10 @@ import org.opends.server.replication.plugin.LDAPReplicationDomain; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.plugin.PersistentServerState; import org.opends.server.replication.protocol.ErrorMsg; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.ReplSessionSecurity; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.service.ReplicationBroker; import org.opends.server.replication.service.ReplicationDomain; import org.opends.server.schema.DirectoryStringSyntax; @@ -260,34 +260,6 @@ broker.setSoTimeout(timeout); checkConnection(30, broker, port); // give some time to the broker to connect // to the replicationServer. if (emptyOldChanges) { /* * loop receiving update until there is nothing left * to make sure that message from previous tests have been consumed. */ try { while (true) { ReplicationMsg rMsg = broker.receive(); if (rMsg instanceof ErrorMsg) { ErrorMsg eMsg = (ErrorMsg)rMsg; logError(new MessageBuilder( "ReplicationTestCase/openReplicationSession ").append( " received ErrorMessage when emptying old changes ").append( eMsg.getDetails()).toMessage()); } } } catch (Exception e) { logError(new MessageBuilder( "ReplicationTestCase/openReplicationSession ").append(e.getMessage()) .append(" when emptying old changes").toMessage()); } } return broker; } @@ -313,32 +285,6 @@ broker.setSoTimeout(timeout); checkConnection(30, broker, port); // give some time to the broker to connect // to the replicationServer. if (emptyOldChanges) { // loop receiving update until there is nothing left // to make sure that message from previous tests have been consumed. try { while (true) { ReplicationMsg rMsg = broker.receive(); if (rMsg instanceof ErrorMsg) { ErrorMsg eMsg = (ErrorMsg)rMsg; logError(new MessageBuilder( "ReplicationTestCase/openReplicationSession ").append( " received ErrorMessage when emptying old changes ").append( eMsg.getDetails()).toMessage()); } } } catch (Exception e) { logError(new MessageBuilder( "ReplicationTestCase/openReplicationSession ").append(e.getMessage()) .append(" when emptying old changes").toMessage()); } } return broker; } */ @@ -435,17 +381,6 @@ boolean emptyOldChanges) throws Exception, SocketException { return openReplicationSession(baseDn, serverId, window_size, port, timeout, maxSendQueue, maxRcvQueue, emptyOldChanges, getGenerationId(baseDn)); } protected ReplicationBroker openReplicationSession( final DN baseDn, int serverId, int window_size, int port, int timeout, int maxSendQueue, int maxRcvQueue, boolean emptyOldChanges, long generationId) throws Exception, SocketException { ServerState state = new ServerState(); if (emptyOldChanges) @@ -453,37 +388,13 @@ ReplicationBroker broker = new ReplicationBroker(null, state, baseDn.toNormalizedString(), serverId, window_size, generationId, 0, getReplSessionSecurity(), (byte)1, 500); getGenerationId(baseDn), 0, getReplSessionSecurity(), (byte)1, 500); ArrayList<String> servers = new ArrayList<String>(1); servers.add("localhost:" + port); broker.start(servers); checkConnection(30, broker, port); if (timeout != 0) broker.setSoTimeout(timeout); if (emptyOldChanges) { /* * loop receiving update until there is nothing left * to make sure that message from previous tests have been consumed. */ try { while (true) { ReplicationMsg rMsg = broker.receive(); if (rMsg instanceof ErrorMsg) { ErrorMsg eMsg = (ErrorMsg)rMsg; logError(new MessageBuilder( "ReplicationTestCase/openReplicationSession ").append( " received ErrorMessage when emptying old changes ").append( eMsg.getDetails()).toMessage()); } } } catch (Exception e) { } } return broker; } @@ -575,11 +486,14 @@ logError(Message.raw(Category.SYNC, Severity.NOTICE, " ##### Calling ReplicationTestCase.classCleanUp ##### ")); // Clean RS databases cleanUpReplicationServersDB(); cleanConfigEntries(); configEntryList = null; configEntryList = new LinkedList<DN>(); cleanRealEntries(); entryList = null; entryList = new LinkedList<DN>(); // Clear the test backend (TestCaseUtils.TEST_ROOT_DN_STRING) // (in case our test created some emtries in it) @@ -631,6 +545,10 @@ assertNoConfigEntriesWithFilter("(objectclass=ds-cfg-replication-server)", "Found unexpected replication server config left"); // Be sure that no replication server instance is left List<ReplicationServer> allRSInstances = ReplicationServer.getAllInstances(); assertTrue(allRSInstances.size() == 0, "Some replication servers left: " + allRSInstances); // Check for config entries for replication domain assertNoConfigEntriesWithFilter("(objectclass=ds-cfg-replication-domain)", "Found unexpected replication domain config left"); @@ -648,6 +566,17 @@ } /** * Cleanup databases of the currently instantiated replication servers in the * VM */ protected void cleanUpReplicationServersDB() { for (ReplicationServer rs : ReplicationServer.getAllInstances()) { rs.clearDb(); } } /** * Performs a search on the config backend with the specified filter. * Fails if a config entry is found. * @param filter The filter to apply for the search @@ -1266,4 +1195,90 @@ // done } } /** * Wait for the arrival of a specific message type on the provided session * before going in timeout and failing. * @param session Session from which we should receive the message. * @param msgType Class of the message we are waiting for. * @return The expected message if it comes in time or fails (assertion). */ protected static ReplicationMsg waitForSpecificMsg(ProtocolSession session, String msgType) { ReplicationMsg replMsg = null; int timeOut = 5000; // 5 seconds max to wait for the desired message long startTime = System.currentTimeMillis(); long curTime = startTime; int nMsg = 0; while ((curTime - startTime) <= timeOut) { try { replMsg = session.receive(); } catch (Exception ex) { fail("Exception waiting for " + msgType + " message : " + ex.getClass().getName() + " : " + ex.getMessage()); } // Get message type String rcvMsgType = replMsg.getClass().getName(); if (rcvMsgType.equals(msgType)) { // Ok, got it, let's return the expected message return replMsg; } TRACER.debugInfo("waitForSpecificMsg received : " + replMsg); nMsg++; curTime = System.currentTimeMillis(); } // Timeout fail("Failed to receive an expected " + msgType + " message after 5 seconds : also received " + nMsg + " other messages during wait time."); return null; } /** * Wait for the arrival of a specific message type on the provided broker * before going in timeout and failing. * @param broker Broker from which we should receive the message. * @param msgType Class of the message we are waiting for. * @return The expected message if it comes in time or fails (assertion). */ protected static ReplicationMsg waitForSpecificMsg(ReplicationBroker broker, String msgType) { ReplicationMsg replMsg = null; int timeOut = 5000; // 5 seconds max to wait for the desired message long startTime = System.currentTimeMillis(); long curTime = startTime; int nMsg = 0; while ((curTime - startTime) <= timeOut) { try { replMsg = broker.receive(); } catch (Exception ex) { fail("Exception waiting for " + msgType + " message : " + ex.getClass().getName() + " : " + ex.getMessage()); } // Get message type String rcvMsgType = replMsg.getClass().getName(); if (rcvMsgType.equals(msgType)) { // Ok, got it, let's return the expected message return replMsg; } TRACER.debugInfo("waitForSpecificMsg received : " + replMsg); nMsg++; curTime = System.currentTimeMillis(); } // Timeout fail("Failed to receive an expected " + msgType + " message after 5 seconds : also received " + nMsg + " other messages during wait time."); return null; } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
@@ -128,6 +128,8 @@ logError(Message.raw(Category.SYNC, Severity.NOTICE, "Starting replication test : pushSchemaChange ")); cleanUpReplicationServersDB(); final DN baseDn = DN.decode("cn=schema"); ReplicationBroker broker = @@ -216,6 +218,8 @@ logError(Message.raw(Category.SYNC, Severity.NOTICE, "Starting replication test : replaySchemaChange ")); cleanUpReplicationServersDB(); final DN baseDn = DN.decode("cn=schema"); ReplicationBroker broker = @@ -253,6 +257,8 @@ logError(Message.raw(Category.SYNC, Severity.NOTICE, "Starting replication test : pushSchemaFilesChange ")); cleanUpReplicationServersDB(); final DN baseDn = DN.decode("cn=schema"); ReplicationBroker broker = opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -67,6 +67,7 @@ import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.schema.DirectoryStringSyntax; import org.opends.server.types.*; import org.opends.server.util.StaticUtils; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -300,6 +301,9 @@ logError(Message.raw(Category.SYNC, Severity.INFORMATION, "Starting synchronization test : toggleReceiveStatus")); // Clean replication server database from previous run cleanUpReplicationServersDB(); final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING); /* @@ -379,6 +383,9 @@ logError(Message.raw(Category.SYNC, Severity.INFORMATION, "Starting replication test : lostHeartbeatFailover")); // Clean replication server database from previous run cleanUpReplicationServersDB(); final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING); /* @@ -483,6 +490,9 @@ DirectoryServer.getAttributeType("entryuuid"); String monitorAttr = "resolved-modify-conflicts"; // Clean replication server database from previous run cleanUpReplicationServersDB(); /* * Open a session to the replicationServer using the broker API. * This must use a different serverId to that of the directory server. @@ -610,6 +620,9 @@ String resolvedMonitorAttr = "resolved-naming-conflicts"; String unresolvedMonitorAttr = "unresolved-naming-conflicts"; // Clean replication server database from previous run cleanUpReplicationServersDB(); /* * Open a session to the replicationServer using the ReplicationServer broker API. * This must use a serverId different from the LDAP server ID @@ -1302,6 +1315,18 @@ return new Object[][] { { false }, {true} }; } private void cleanupTest() { try { classCleanUp(); setUp(); } catch (Exception e) { fail("Test cleanup failed: " + e.getClass().getName() + " : " + e.getMessage() + " : " + StaticUtils.stackTraceToSingleLineString(e)); } } /** * Tests done using directly the ReplicationBroker interface. */ @@ -1312,6 +1337,9 @@ Category.SYNC, Severity.INFORMATION, "Starting replication test : updateOperations " + assured)); // Cleanup from previous run cleanupTest(); final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING); ReplicationBroker broker = @@ -1341,15 +1369,15 @@ // Check if the client has received the msg ReplicationMsg msg = broker.receive(); assertTrue(msg instanceof AddMsg, "The received replication message is not an ADD msg"); "The received replication message is not an ADD msg : " + msg); AddMsg addMsg = (AddMsg) msg; Operation receivedOp = addMsg.createOperation(connection); assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0, "The received replication message is not an ADD msg"); "The received replication message is not an ADD msg : " + addMsg); assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(), "The received ADD replication message is not for the excepted DN"); "The received ADD replication message is not for the excepted DN : " + addMsg); } // Modify the entry @@ -1364,12 +1392,12 @@ // See if the client has received the msg ReplicationMsg msg = broker.receive(); assertTrue(msg instanceof ModifyMsg, "The received replication message is not a MODIFY msg"); "The received replication message is not a MODIFY msg : " + msg); ModifyMsg modMsg = (ModifyMsg) msg; modMsg.createOperation(connection); assertTrue(DN.decode(modMsg.getDn()).compareTo(personEntry.getDN()) == 0, "The received MODIFY replication message is not for the excepted DN"); "The received MODIFY replication message is not for the excepted DN : " + modMsg); // Modify the entry DN DN newDN = DN.decode("uid= new person,ou=People," + TEST_ROOT_DN_STRING) ; @@ -1387,12 +1415,12 @@ // See if the client has received the msg msg = broker.receive(); assertTrue(msg instanceof ModifyDNMsg, "The received replication message is not a MODIFY DN msg"); "The received replication message is not a MODIFY DN msg : " + msg); ModifyDNMsg moddnMsg = (ModifyDNMsg) msg; moddnMsg.createOperation(connection); assertTrue(DN.decode(moddnMsg.getDn()).compareTo(personEntry.getDN()) == 0, "The received MODIFY_DN message is not for the excepted DN"); "The received MODIFY_DN message is not for the excepted DN : " + moddnMsg); // Delete the entry DeleteOperationBasis delOp = new DeleteOperationBasis(connection, @@ -1406,12 +1434,12 @@ // See if the client has received the msg msg = broker.receive(); assertTrue(msg instanceof DeleteMsg, "The received replication message is not a MODIFY DN msg"); "The received replication message is not a MODIFY DN msg : " + msg); DeleteMsg delMsg = (DeleteMsg) msg; delMsg.createOperation(connection); assertTrue(DN.decode(delMsg.getDn()).compareTo(DN .decode("uid= new person,ou=People," + TEST_ROOT_DN_STRING)) == 0, "The received DELETE message is not for the excepted DN"); "The received DELETE message is not for the excepted DN : " + delMsg); /* * Now check that when we send message to the ReplicationServer @@ -1512,6 +1540,9 @@ logError(Message.raw(Category.SYNC, Severity.INFORMATION, "Starting replication test : deleteNoSuchObject")); // Clean replication server database from previous run cleanUpReplicationServersDB(); DN dn = DN.decode("cn=No Such Object,ou=People," + TEST_ROOT_DN_STRING); DeleteOperationBasis op = new DeleteOperationBasis(connection, @@ -1535,6 +1566,9 @@ final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING); // Clean replication server database from previous run cleanUpReplicationServersDB(); Thread.sleep(2000); ReplicationBroker broker = openReplicationSession(baseDn, 11, 100, replServerPort, 1000, true); @@ -1675,6 +1709,9 @@ final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING); // Clean replication server database from previous run cleanUpReplicationServersDB(); /* * Open a session to the replicationServer using the broker API. * This must use a different serverId to that of the directory server. opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -501,7 +501,7 @@ // Send topo view List<RSInfo> rsList = new ArrayList<RSInfo>(); RSInfo rsInfo = new RSInfo(serverId, generationId, groupId); RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, 1); rsList.add(rsInfo); TopologyMsg topologyMsg = new TopologyMsg(new ArrayList<DSInfo>(), rsList); @@ -719,7 +719,7 @@ } /** * Read the coming seaf read mode updates and send back acks with errors * Read the coming safe read mode updates and send back acks with errors */ private void executeSafeReadManyErrorsScenario() { @@ -1058,7 +1058,7 @@ } /** * Tests parameters sent in session handshake an updates, when not using * Tests parameters sent in session handshake and updates, when not using * assured replication */ @Test opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -836,7 +836,7 @@ fail("Unknown replication server id."); } return new RSInfo(rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, (byte)groupId); return new RSInfo(rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, (byte)groupId, 1); } /** opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -1092,11 +1092,11 @@ dsList4.add(dsInfo2); dsList4.add(dsInfo1); RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103); RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103, 1); RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0); RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0, 1); RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98); RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98, 1); List<RSInfo> rsList1 = new ArrayList<RSInfo>(); rsList1.add(rsInfo1); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -1026,13 +1026,13 @@ dsList4.add(dsInfo2); dsList4.add(dsInfo1); RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103); RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103, 1); RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0); RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0, 1); RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98); RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98, 1); RSInfo rsInfo4 = new RSInfo(45678, (long)-21113, (byte)98); RSInfo rsInfo4 = new RSInfo(45678, (long)-21113, (byte)98, 1); List<RSInfo> rsList1 = new ArrayList<RSInfo>(); rsList1.add(rsInfo1); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -29,7 +29,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.math.BigInteger; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; @@ -595,6 +594,9 @@ ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(port, dir, 0, serverId, 0, 100, replServers, groupId, assuredTimeout, 5000); // No monitoring publisher to not interfer with some SocketTimeoutException // expected at some points in these tests conf.setMonitoringPeriod(0L); ReplicationServer replicationServer = new ReplicationServer(conf); return replicationServer; @@ -908,7 +910,7 @@ ReplicationMsg replMsg = session.receive(); if (replMsg instanceof ErrorMsg) { // Support for connection done with bad gen id : we receive an error // Support for connection done with bad gen id : we receive an error // message that we must throw away before reading our ack. replMsg = session.receive(); } @@ -967,7 +969,7 @@ } // Send our topo mesg RSInfo rsInfo = new RSInfo(serverId, generationId, groupId); RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, 1); List<RSInfo> rsInfos = new ArrayList<RSInfo>(); rsInfos.add(rsInfo); TopologyMsg topoMsg = new TopologyMsg(null, rsInfos); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
@@ -63,8 +63,11 @@ // The weight of the server private int weight = 1; // The monitoring publisher period private long monitoringPeriod = 3000; /** * Constructor without goup id, assured info and weight * Constructor without group id, assured info and weight */ public ReplServerFakeConfiguration( int port, String dirName, int purgeDelay, int serverId, @@ -254,4 +257,17 @@ return weight; } public long getMonitoringPeriod() { return monitoringPeriod; } /** * @param monitoringPeriod the monitoringPeriod to set */ public void setMonitoringPeriod(long monitoringPeriod) { this.monitoringPeriod = monitoringPeriod; } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -74,7 +74,6 @@ import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.ProtocolVersion; import org.opends.server.replication.protocol.ReplServerStartDSMsg; import org.opends.server.replication.protocol.ReplServerStartMsg; import org.opends.server.replication.protocol.ReplSessionSecurity; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.protocol.ServerStartMsg; @@ -1003,7 +1002,7 @@ ProtocolVersion.getCurrentVersion(), 0, sslEncryption, (byte)-1); session.publish(msg); // Read the Replication Server state from the ReplServerStartMsg that // Read the Replication Server state from the ReplServerStartDSMsg that // comes back. ReplServerStartDSMsg replStartDSMsg = (ReplServerStartDSMsg) session.receive(); @@ -1079,7 +1078,8 @@ // check that this did not change the window by sending a probe again. session.publish(new WindowProbeMsg()); windowMsg = (WindowMsg) session.receive(); // We may receive some MonitoringMsg so use filter method windowMsg = (WindowMsg)waitForSpecificMsg(session, WindowMsg.class.getName()); assertEquals(serverwindow, windowMsg.getNumAck()); debugInfo("Ending windowProbeTest"); }