opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -230,14 +230,7 @@ return newStatus; } /** * Retrieves a set of attributes containing monitor data that should be * returned to the client if the corresponding monitor entry is requested. * * @return A set of attributes containing monitor data that should be * returned to the client if the corresponding monitor entry is * requested. */ /** {@inheritDoc} */ @Override public List<Attribute> getMonitorData() { @@ -426,8 +419,7 @@ } } // lock with no timeout lockDomain(false); lockDomainNoTimeout(); localGenerationId = replicationServerDomain.getGenerationId(); oldGenerationId = localGenerationId; @@ -501,9 +493,7 @@ } finally { if (replicationServerDomain != null && replicationServerDomain.hasLock()) replicationServerDomain.release(); releaseDomainLock(); } } opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -427,22 +427,14 @@ { try { // Process start from remote boolean sessionInitiatorSSLEncryption = processStartFromRemote(inECLStartMsg); // lock with timeout if (replicationServerDomain != null) { lockDomain(true); } lockDomainWithTimeout(); localGenerationId = -1; // send start to remote StartMsg outStartMsg = sendStartToRemote(); // log logStartHandshakeRCVandSND(inECLStartMsg, outStartMsg); // until here session is encrypted then it depends on the negotiation @@ -455,8 +447,7 @@ waitAndProcessStartSessionECLFromRemoteServer(); if (inStartECLSessionMsg == null) { // client wants to properly close the connection (client sent a // StopMsg) // client wants to properly close the connection (client sent a StopMsg) logStopReceived(); abortStart(null); return; @@ -464,7 +455,6 @@ logStartECLSessionHandshake(inStartECLSessionMsg); // initialization initialize(inStartECLSessionMsg); } catch(DirectoryException de) @@ -477,11 +467,7 @@ } finally { if ((replicationServerDomain != null) && replicationServerDomain.hasLock()) { replicationServerDomain.release(); } releaseDomainLock(); } } @@ -956,14 +942,7 @@ + ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT; } /** * Retrieves a set of attributes containing monitor data that should be * returned to the client if the corresponding monitor entry is requested. * * @return A set of attributes containing monitor data that should be * returned to the client if the corresponding monitor entry is * requested. */ /** {@inheritDoc} */ @Override public List<Attribute> getMonitorData() { opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -177,6 +177,15 @@ } /** * Returns the shutdown flag. * @return The shutdown flag value. */ public boolean shuttingDown() { return shuttingDown.get(); } /** * Returns the Replication Server Domain to which belongs this handler. * * @param createIfNotExist Creates the domain if it does not exist. @@ -204,14 +213,7 @@ return inCount; } /** * Retrieves a set of attributes containing monitor data that should be * returned to the client if the corresponding monitor entry is requested. * * @return A set of attributes containing monitor data that should be * returned to the client if the corresponding monitor entry is * requested. */ /** {@inheritDoc} */ @Override public List<Attribute> getMonitorData() { @@ -571,7 +573,7 @@ } /** * Increase the counter of update received from the server. * Increase the counter of updates received from the server. */ public void incrementInCount() { opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -154,7 +154,7 @@ try { lockDomain(false); // no timeout lockDomainNoTimeout(); ReplServerStartMsg outReplServerStartMsg = sendStartToRemote(); @@ -210,7 +210,7 @@ { /* Only protocol version above V1 has a phase 2 handshake NOW PROCEDE WITH SECOND PHASE OF HANDSHAKE: NOW PROCEED WITH SECOND PHASE OF HANDSHAKE: TopologyMsg then TopologyMsg (with a RS) Send our own TopologyMsg to remote RS @@ -280,9 +280,7 @@ } finally { if (replicationServerDomain != null && replicationServerDomain.hasLock()) replicationServerDomain.release(); releaseDomainLock(); } } @@ -300,8 +298,7 @@ // The initiator decides if the session is encrypted sslEncryption = processStartFromRemote(inReplServerStartMsg); // lock with timeout lockDomain(true); lockDomainWithTimeout(); if (replicationServerDomain.isAlreadyConnectedToRS(this)) { @@ -418,9 +415,7 @@ } finally { if (replicationServerDomain != null && replicationServerDomain.hasLock()) replicationServerDomain.release(); releaseDomainLock(); } } @@ -498,55 +493,49 @@ */ private void checkGenerationId() { if (localGenerationId > 0) { // the local RS is initialized if (generationId > 0 // the remote RS is initialized. If not, there's nothing to do anyway. && generationId != localGenerationId) { /* Either: * * 1) The 2 RS have different generationID * replicationServerDomain.getGenerationIdSavedStatus() == true * * if the present RS has received changes regarding its * gen ID and so won't change without a reset * then we are just degrading the peer. * * 2) This RS has never received any changes for the current * generation ID. * * Example case: * - we are in RS1 * - RS2 has genId2 from LS2 (genId2 <=> no data in LS2) * - RS1 has genId1 from LS1 /genId1 comes from data in suffix * - we are in RS1 and we receive a START msg from RS2 * - Each RS keeps its genID / is degraded and when LS2 * will be populated from LS1 everything will become ok. * * Issue: * FIXME : Would it be a good idea in some cases to just set the * gen ID received from the peer RS specially if the peer has a * non null state and we have a null state ? * replicationServerDomain.setGenerationId(generationId, false); */ Message message = WARN_BAD_GENERATION_ID_FROM_RS.get( serverId, session.getReadableRemoteAddress(), generationId, getBaseDN(), getReplicationServerId(), localGenerationId); logError(message); } } else if (localGenerationId <= 0) { /* The local RS is not initialized - take the one received WARNING: Must be done before computing topo message to send to peer server as topo message must embed valid generation id for our server */ // The local RS is not initialized - take the one received // WARNING: Must be done before computing topo message to send to peer // server as topo message must embed valid generation id for our server oldGenerationId = replicationServerDomain.changeGenerationId(generationId, false); } // the local RS is initialized if (generationId > 0 // the remote RS is initialized. If not, there's nothing to do anyway. && generationId != localGenerationId) { /* Either: * * 1) The 2 RS have different generationID * replicationServerDomain.getGenerationIdSavedStatus() == true * * if the present RS has received changes regarding its gen ID and so will * not change without a reset then we are just degrading the peer. * * 2) This RS has never received any changes for the current gen ID. * * Example case: * - we are in RS1 * - RS2 has genId2 from LS2 (genId2 <=> no data in LS2) * - RS1 has genId1 from LS1 /genId1 comes from data in suffix * - we are in RS1 and we receive a START msg from RS2 * - Each RS keeps its genID / is degraded and when LS2 * will be populated from LS1 everything will become ok. * * Issue: * FIXME : Would it be a good idea in some cases to just set the gen ID * received from the peer RS specially if the peer has a non null state * and we have a null state ? * replicationServerDomain.setGenerationId(generationId, false); */ Message message = WARN_BAD_GENERATION_ID_FROM_RS.get( serverId, session.getReadableRemoteAddress(), generationId, getBaseDN(), getReplicationServerId(), localGenerationId); logError(message); } } /** @@ -584,7 +573,11 @@ public void shutdown() { super.shutdown(); // Stop the remote LSHandler clearRemoteLSHandlers(); } private void clearRemoteLSHandlers() { synchronized (remoteDirectoryServers) { for (LightweightServerHandler lsh : remoteDirectoryServers.values()) @@ -594,6 +587,7 @@ remoteDirectoryServers.clear(); } } /** * Stores topology information received from a peer RS and that must be kept * in RS handler. @@ -602,29 +596,20 @@ */ public void processTopoInfoFromRS(TopologyMsg topoMsg) { // Store info for remote RS List<RSInfo> rsInfos = topoMsg.getRsList(); // List should only contain RS info for sender RSInfo rsInfo = rsInfos.get(0); final RSInfo rsInfo = topoMsg.getRsList().get(0); generationId = rsInfo.getGenerationId(); groupId = rsInfo.getGroupId(); weight = rsInfo.getWeight(); // Store info for DSs connected to the peer RS List<DSInfo> dsInfos = topoMsg.getDsList(); synchronized (remoteDirectoryServers) { // Removes the existing structures for (LightweightServerHandler lsh : remoteDirectoryServers.values()) { lsh.stopHandler(); } remoteDirectoryServers.clear(); clearRemoteLSHandlers(); // Creates the new structure according to the message received. for (DSInfo dsInfo : dsInfos) for (DSInfo dsInfo : topoMsg.getDsList()) { // For each DS connected to the peer RS LightweightServerHandler lsh = new LightweightServerHandler(this, serverId, dsInfo.getDsId(), dsInfo.getDsUrl(), dsInfo.getGenerationId(), dsInfo.getGroupId(), dsInfo.getStatus(), @@ -670,10 +655,7 @@ */ public boolean hasRemoteLDAPServers() { synchronized (remoteDirectoryServers) { return !remoteDirectoryServers.isEmpty(); } return !remoteDirectoryServers.isEmpty(); } /** @@ -682,10 +664,7 @@ */ public Set<Integer> getConnectedDirectoryServerIds() { synchronized (remoteDirectoryServers) { return remoteDirectoryServers.keySet(); } return remoteDirectoryServers.keySet(); } /** @@ -698,14 +677,7 @@ + ",cn=" + replicationServerDomain.getMonitorInstanceName(); } /** * Retrieves a set of attributes containing monitor data that should be * returned to the client if the corresponding monitor entry is requested. * * @return A set of attributes containing monitor data that should be * returned to the client if the corresponding monitor entry is * requested. */ /** {@inheritDoc} */ @Override public List<Attribute> getMonitorData() { opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -32,7 +32,6 @@ import java.util.Random; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.opends.messages.Message; @@ -206,11 +205,6 @@ protected boolean shutdownWriter = false; /** * Set when ServerHandler is stopping. */ private AtomicBoolean shuttingDown = new AtomicBoolean(false); /** * Weight of this remote server. */ protected int weight = 1; @@ -253,8 +247,7 @@ closeSession(localSession, reason, this); } if (replicationServerDomain != null && replicationServerDomain.hasLock()) replicationServerDomain.release(); releaseDomainLock(); // If generation id of domain was changed, set it back to old value // We may have changed it as it was -1 and we received a value >0 from @@ -268,6 +261,17 @@ } /** * Releases the lock on the replication server domain if it was held. */ protected void releaseDomainLock() { if (replicationServerDomain != null && replicationServerDomain.hasLock()) { replicationServerDomain.release(); } } /** * Check the protocol window and send WindowMsg if necessary. * * @throws IOException when the session becomes unavailable. @@ -295,25 +299,6 @@ } /** * Set the shut down flag to true and returns the previous value of the flag. * @return The previous value of the shut down flag */ @Override public boolean engageShutdown() { return shuttingDown.getAndSet(true); } /** * Returns the shutdown flag. * @return The shutdown flag value. */ public boolean shuttingDown() { return shuttingDown.get(); } /** * Finalize the initialization, create reader, writer, heartbeat system * and monitoring system. * @throws DirectoryException When an exception is raised. @@ -542,14 +527,7 @@ return inCount; } /** * Retrieves a set of attributes containing monitor data that should be * returned to the client if the corresponding monitor entry is requested. * * @return A set of attributes containing monitor data that should be * returned to the client if the corresponding monitor entry is * requested. */ /** {@inheritDoc} */ @Override public List<Attribute> getMonitorData() { @@ -739,24 +717,6 @@ } /** * Increase the counter of update received from the server. */ @Override public void incrementInCount() { inCount++; } /** * Increase the counter of updates sent to the server. */ @Override public void incrementOutCount() { outCount++; } /** * {@inheritDoc} */ @Override @@ -783,81 +743,91 @@ return !this.isDataServer(); } // The handshake phase must be done by blocking any access to structures // keeping info on connected servers, so that one can safely check for // pre-existence of a server, send a coherent snapshot of known topology to // peers, update the local view of the topology... // // For instance a kind of problem could be that while we connect with a // peer RS, a DS is connecting at the same time and we could publish the // connected DSs to the peer RS forgetting this last DS in the TopologyMsg. // // This method and every others that need to read/make changes to the // structures holding topology for the domain should: // - call ReplicationServerDomain.lock() // - read/modify structures // - call ReplicationServerDomain.release() // // More information is provided in comment of ReplicationServerDomain.lock() /** * Lock the domain potentially with a timeout. * Lock the domain without a timeout. * <p> * If domain already exists, lock it until handshake is finished otherwise it * will be created and locked later in the method * * @param timedout * The provided timeout. * @throws DirectoryException * When an exception occurs. * @throws InterruptedException * If the current thread was interrupted while waiting for the lock. */ protected void lockDomain(boolean timedout) throws DirectoryException, InterruptedException public void lockDomainNoTimeout() throws DirectoryException, InterruptedException { // The handshake phase must be done by blocking any access to structures // keeping info on connected servers, so that one can safely check for // pre-existence of a server, send a coherent snapshot of known topology // to peers, update the local view of the topology... // // For instance a kind of problem could be that while we connect with a // peer RS, a DS is connecting at the same time and we could publish the // connected DSs to the peer RS forgetting this last DS in the TopologyMsg. // // This method and every others that need to read/make changes to the // structures holding topology for the domain should: // - call ReplicationServerDomain.lock() // - read/modify structures // - call ReplicationServerDomain.release() // // More information is provided in comment of ReplicationServerDomain.lock() // If domain already exists, lock it until handshake is finished otherwise // it will be created and locked later in the method if (!timedout) if (!replicationServerDomain.hasLock()) { if (!replicationServerDomain.hasLock()) replicationServerDomain.lock(); replicationServerDomain.lock(); } } /** * Lock the domain with a timeout. * <p> * Take the lock on the domain. WARNING: Here we try to acquire the lock with * a timeout. This is for preventing a deadlock that may happen if there are * cross connection attempts (for same domain) from this replication server * and from a peer one. * <p> * Here is the scenario: * <ol> * <li>RS1 connect thread takes the domain lock and starts connection to RS2</li> * <li>at the same time RS2 connect thread takes his domain lock and start * connection to RS2</li> * <li>RS2 listen thread starts processing received ReplServerStartMsg from * RS1 and wants to acquire the lock on the domain (here) but cannot as RS2 * connect thread already has it</li> * <li>RS1 listen thread starts processing received ReplServerStartMsg from * RS2 and wants to acquire the lock on the domain (here) but cannot as RS1 * connect thread already has it</li> * </ol> * => Deadlock: 4 threads are locked. * <p> * To prevent threads locking in such situation, the listen threads here will * both timeout trying to acquire the lock. The random time for the timeout * should allow on connection attempt to be aborted whereas the other one * should have time to finish in the same time. * <p> * Warning: the minimum time (3s) should be big enough to allow normal * situation connections to terminate. The added random time should represent * a big enough range so that the chance to have one listen thread timing out * a lot before the peer one is great. When the first listen thread times out, * the remote connect thread should release the lock and allow the peer listen * thread to take the lock it was waiting for and process the connection * attempt. * * @throws DirectoryException * When an exception occurs. * @throws InterruptedException * If the current thread was interrupted while waiting for the lock. */ public void lockDomainWithTimeout() throws DirectoryException, InterruptedException { if (replicationServerDomain == null) { return; } /** * Take the lock on the domain. * WARNING: Here we try to acquire the lock with a timeout. This * is for preventing a deadlock that may happen if there are cross * connection attempts (for same domain) from this replication * server and from a peer one: * Here is the scenario: * - RS1 connect thread takes the domain lock and starts * connection to RS2 * - at the same time RS2 connect thread takes his domain lock and * start connection to RS2 * - RS2 listen thread starts processing received * ReplServerStartMsg from RS1 and wants to acquire the lock on * the domain (here) but cannot as RS2 connect thread already has * it * - RS1 listen thread starts processing received * ReplServerStartMsg from RS2 and wants to acquire the lock on * the domain (here) but cannot as RS1 connect thread already has * it * => Deadlock: 4 threads are locked. * So to prevent that in such situation, the listen threads here * will both timeout trying to acquire the lock. The random time * for the timeout should allow on connection attempt to be * aborted whereas the other one should have time to finish in the * same time. * Warning: the minimum time (3s) should be big enough to allow * normal situation connections to terminate. The added random * time should represent a big enough range so that the chance to * have one listen thread timing out a lot before the peer one is * great. When the first listen thread times out, the remote * connect thread should release the lock and allow the peer * listen thread to take the lock it was waiting for and process * the connection attempt. */ Random random = new Random(); int randomTime = random.nextInt(6); // Random from 0 to 5 // Wait at least 3 seconds + (0 to 5 seconds)