| | |
| | | |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.AckMsg; |
| | | import org.opends.server.replication.protocol.ErrorMsg; |
| | | import org.opends.server.replication.protocol.RoutableMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | |
| | | import org.opends.server.replication.protocol.MonitorMsg; |
| | | import org.opends.server.replication.protocol.MonitorRequestMsg; |
| | | import org.opends.server.replication.protocol.ResetGenerationIdMsg; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.util.TimeThread; |
| | | import com.sleepycat.je.DatabaseException; |
| | | import java.util.Timer; |
| | | import java.util.TimerTask; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | import org.opends.server.replication.common.AssuredMode; |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.common.StatusMachineEvent; |
| | | import org.opends.server.replication.protocol.AckMsg; |
| | | import org.opends.server.replication.protocol.ChangeStatusMsg; |
| | | import org.opends.server.replication.protocol.ProtocolVersion; |
| | | |
| | | /** |
| | | * This class define an in-memory cache that will be used to store |
| | |
| | | */ |
| | | public class ReplicationServerDomain |
| | | { |
| | | |
| | | private final Object flowControlLock = new Object(); |
| | | private final DN baseDn; |
| | | private final String baseDn; |
| | | // The Status analyzer that periodically verifis if the connected DSs are |
| | | // late or not |
| | | private StatusAnalyzer statusAnalyzer = null; |
| | |
| | | private MonitorData wrkMonitorData; |
| | | |
| | | /** |
| | | * The needed info for each received assured update message we are waiting |
| | | * acks for. |
| | | * Key: a change number matching a received update message which requested |
| | | * assured mode usage (either safe read or safe data mode) |
| | | * Value: The object holding every info needed about the already received acks |
| | | * as well as the acks to be received. |
| | | * For more details, see ExpectedAcksInfo and its sub classes javadoc. |
| | | */ |
| | | private final ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo> waitingAcks = |
| | | new ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo>(); |
| | | |
| | | // The timer used to run the timeout code (timer tasks) for the assured update |
| | | // messages we are waiting acks for. |
| | | private Timer assuredTimeoutTimer = null; |
| | | // Counter used to purge the timer tasks referemces in assuredTimeoutTimer, |
| | | // every n number of treated assured messages |
| | | private int assuredTimeoutTimerPurgeCounter = 0; |
| | | |
| | | /** |
| | | * Creates a new ReplicationServerDomain associated to the DN baseDn. |
| | | * |
| | | * @param baseDn The baseDn associated to the ReplicationServerDomain. |
| | | * @param replicationServer the ReplicationServer that created this |
| | | * replicationServer cache. |
| | | */ |
| | | public ReplicationServerDomain(DN baseDn, ReplicationServer replicationServer) |
| | | public ReplicationServerDomain( |
| | | String baseDn, ReplicationServer replicationServer) |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.replicationServer = replicationServer; |
| | | this.assuredTimeoutTimer = new Timer("Replication Assured Timer for " + |
| | | baseDn + " in RS " + replicationServer.getServerId(), true); |
| | | } |
| | | |
| | | /** |
| | |
| | | public void put(UpdateMsg update, ServerHandler sourceHandler) |
| | | throws IOException |
| | | { |
| | | /* |
| | | * TODO : In case that the source server is a LDAP server this method |
| | | * should check that change did get pushed to at least one |
| | | * other replication server before pushing it to the LDAP servers |
| | | */ |
| | | |
| | | short id = update.getChangeNumber().getServerId(); |
| | | ChangeNumber cn = update.getChangeNumber(); |
| | | short id = cn.getServerId(); |
| | | sourceHandler.updateServerState(update); |
| | | sourceHandler.incrementInCount(); |
| | | |
| | | if (update.isAssured()) |
| | | { |
| | | int count = this.NumServers(); |
| | | if (count > 1) |
| | | { |
| | | if (sourceHandler.isReplicationServer()) |
| | | ServerHandler.addWaitingAck(update, sourceHandler.getServerId(), |
| | | this, count - 1); |
| | | else |
| | | sourceHandler.addWaitingAck(update, count - 1); |
| | | } else |
| | | { |
| | | sourceHandler.sendAck(update.getChangeNumber()); |
| | | } |
| | | } |
| | | |
| | | if (generationId < 0) |
| | | { |
| | | generationId = sourceHandler.getGenerationId(); |
| | |
| | | // Publish the messages to the source handler |
| | | dbHandler.add(update); |
| | | |
| | | /** |
| | | * If this is an assured message (a message requesting ack), we must |
| | | * construct the ExpectedAcksInfo object with the right number of expected |
| | | * acks before posting message to the writers. Otherwise some writers may |
| | | * have time to post, receive the ack and increment received ack counter |
| | | * (kept in ExpectedAcksInfo object) and we could think the acknowledgment |
| | | * is fully processed although it may be not (some other acks from other |
| | | * servers are not yet arrived). So for that purpose we do a pre-loop |
| | | * to determine to who we will post an assured message. |
| | | * Whether the assured mode is safe read or safe data, we anyway do not |
| | | * support the assured replication feature across topologies with different |
| | | * group ids. The assured feature insures assured replication based on the |
| | | * same locality (group id). For instance in double data center deployment |
| | | * (2 group id usage) with assured replication enabled, an assured message |
| | | * sent from data center 1 (group id = 1) will be sent to servers of both |
| | | * data centers, but one will request and wait acks only from servers of the |
| | | * data center 1. |
| | | */ |
| | | boolean assuredMessage = update.isAssured(); |
| | | PreparedAssuredInfo preparedAssuredInfo = null; |
| | | if (assuredMessage) |
| | | { |
| | | // Assured feature is supported starting from replication protocol V2 |
| | | if (sourceHandler.getProtocolVersion() >= |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V2) |
| | | { |
| | | // According to assured sub-mode, prepare structures to keep track of |
| | | // the acks we are interested in. |
| | | AssuredMode assuredMode = update.getAssuredMode(); |
| | | if (assuredMode != AssuredMode.SAFE_DATA_MODE) |
| | | { |
| | | preparedAssuredInfo = processSafeDataUpdateMsg(update, sourceHandler); |
| | | } else if (assuredMode != AssuredMode.SAFE_READ_MODE) |
| | | { |
| | | preparedAssuredInfo = processSafeReadUpdateMsg(update, sourceHandler); |
| | | } else |
| | | { |
| | | // Unknown assured mode: should never happen |
| | | Message errorMsg = ERR_RS_UNKNOWN_ASSURED_MODE.get( |
| | | Short.toString(replicationServer.getServerId()), |
| | | assuredMode.toString(), baseDn, update.toString()); |
| | | logError(errorMsg); |
| | | assuredMessage = false; |
| | | } |
| | | } else |
| | | { |
| | | assuredMessage = false; |
| | | } |
| | | } |
| | | |
| | | List<Short> expectedServers = null; |
| | | if (assuredMessage) |
| | | { |
| | | expectedServers = preparedAssuredInfo.expectedServers; |
| | | if (expectedServers != null) |
| | | { |
| | | // Store the expected acks info into the global map. |
| | | // The code for processing reception of acks for this update will update |
| | | // info kept in this object and if enough acks received, it will send |
| | | // back the final ack to the requester and remove the object from this |
| | | // map |
| | | // OR |
| | | // The following timer will time out and send an timeout ack to the |
| | | // requester if the acks are not received in time. The timer will also |
| | | // remove the object from this map. |
| | | waitingAcks.put(cn, preparedAssuredInfo.expectedAcksInfo); |
| | | |
| | | // Arm timer for this assured update message (wait for acks until it |
| | | // times out) |
| | | AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(cn); |
| | | assuredTimeoutTimer.schedule(assuredTimeoutTask, |
| | | replicationServer.getAssuredTimeout()); |
| | | // Purge timer every 100 treated messages |
| | | assuredTimeoutTimerPurgeCounter++; |
| | | if ((assuredTimeoutTimerPurgeCounter % 100) == 0) |
| | | assuredTimeoutTimer.purge(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * The update message equivalent to the originally received update message, |
| | | * but with assured flag disabled. This message is the one that should be |
| | | * sent to non elligible servers for assured mode. |
| | | * We need a clone like of the original message with assured flag off, to be |
| | | * posted to servers we don't want to wait the ack from (not normal status |
| | | * servers or servers with different group id). This must be done because |
| | | * the posted message is a reference so each writer queue gets the same |
| | | * reference, thus, changing the assured flag of an object is done for every |
| | | * references posted on every writer queues. That is why we need a message |
| | | * version with assured flag on and another one with assured flag off. |
| | | */ |
| | | NotAssuredUpdateMsg notAssuredUpdate = null; |
| | | |
| | | /* |
| | | * Push the message to the replication servers |
| | | */ |
| | | if (!sourceHandler.isReplicationServer()) |
| | | if (sourceHandler.isLDAPserver()) |
| | | { |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In RS " + |
| | | replicationServer.getServerId() + |
| | | " for dn " + baseDn.toNormalizedString() + ", update " + |
| | | " for dn " + baseDn + ", update " + |
| | | update.getChangeNumber().toString() + |
| | | " will not be sent to replication server " + |
| | | Short.toString(handler.getServerId()) + " with generation id " + |
| | |
| | | continue; |
| | | } |
| | | |
| | | handler.add(update, sourceHandler); |
| | | if (assuredMessage) |
| | | { |
| | | // Assured mode: post an assured or not assured matching update |
| | | // message according to what has been computed for the destination |
| | | // server |
| | | if ((expectedServers != null) && expectedServers.contains(handler. |
| | | getServerId())) |
| | | { |
| | | handler.add(update, sourceHandler); |
| | | } else |
| | | { |
| | | if (notAssuredUpdate == null) |
| | | { |
| | | notAssuredUpdate = new NotAssuredUpdateMsg(update); |
| | | } |
| | | handler.add(notAssuredUpdate, sourceHandler); |
| | | } |
| | | } else |
| | | { |
| | | handler.add(update, sourceHandler); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | for (ServerHandler handler : directoryServers.values()) |
| | | { |
| | | // don't forward the change to the server that just sent it |
| | | // Don't forward the change to the server that just sent it |
| | | if (handler == sourceHandler) |
| | | { |
| | | continue; |
| | |
| | | if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) |
| | | TRACER.debugInfo("In RS " + |
| | | replicationServer.getServerId() + |
| | | " for dn " + baseDn.toNormalizedString() + ", update " + |
| | | " for dn " + baseDn + ", update " + |
| | | update.getChangeNumber().toString() + |
| | | " will not be sent to directory server " + |
| | | Short.toString(handler.getServerId()) + " with generation id " + |
| | |
| | | if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | | TRACER.debugInfo("In RS " + |
| | | replicationServer.getServerId() + |
| | | " for dn " + baseDn.toNormalizedString() + ", update " + |
| | | " for dn " + baseDn + ", update " + |
| | | update.getChangeNumber().toString() + |
| | | " will not be sent to directory server " + |
| | | Short.toString(handler.getServerId()) + |
| | |
| | | continue; |
| | | } |
| | | |
| | | handler.add(update, sourceHandler); |
| | | if (assuredMessage) |
| | | { |
| | | // Assured mode: post an assured or not assured matching update |
| | | // message according to what has been computed for the destination |
| | | // server |
| | | if ((expectedServers != null) && expectedServers.contains(handler. |
| | | getServerId())) |
| | | { |
| | | handler.add(update, sourceHandler); |
| | | } else |
| | | { |
| | | if (notAssuredUpdate == null) |
| | | { |
| | | notAssuredUpdate = new NotAssuredUpdateMsg(update); |
| | | } |
| | | handler.add(notAssuredUpdate, sourceHandler); |
| | | } |
| | | } else |
| | | { |
| | | handler.add(update, sourceHandler); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Helper class to be the return type of a method that processes a just |
| | | * received assured update message: |
| | | * - processSafeReadUpdateMsg |
| | | * - processSafeDataUpdateMsg |
| | | * This is a facility to pack many interesting returned object. |
| | | */ |
| | | private class PreparedAssuredInfo |
| | | { |
| | | /** |
| | | * The list of servers identified as servers we are interested in |
| | | * receiving acks from. If this list is not null, then expectedAcksInfo |
| | | * should be not null. |
| | | * Servers that are not in this list are servers not elligible for an ack |
| | | * request. |
| | | * |
| | | */ |
| | | public List<Short> expectedServers = null; |
| | | |
| | | /** |
| | | * The constructed ExpectedAcksInfo object to be used when acks will be |
| | | * received. Null if expectedServers is null. |
| | | */ |
| | | public ExpectedAcksInfo expectedAcksInfo = null; |
| | | } |
| | | |
| | | /** |
| | | * Process a just received assured update message in Safe Read mode. If the |
| | | * ack can be sent immediately, it is done here. This will also determine to |
| | | * which suitable servers an ack should be requested from, and which ones are |
| | | * not elligible for an ack request. |
| | | * This method is an helper method for the put method. Have a look at the put |
| | | * method for a better understanding. |
| | | * @param update The just received assured update to process. |
| | | * @param sourceHandler The ServerHandler for the server from which the |
| | | * update was received |
| | | * @return A suitable PreparedAssuredInfo object that contains every needed |
| | | * info to proceed with post to server writers. |
| | | * @throws IOException When an IO exception happens during the update |
| | | * processing. |
| | | */ |
| | | private PreparedAssuredInfo processSafeReadUpdateMsg( |
| | | UpdateMsg update, ServerHandler sourceHandler) throws IOException |
| | | { |
| | | ChangeNumber cn = update.getChangeNumber(); |
| | | byte groupId = replicationServer.getGroupId(); |
| | | byte sourceGroupId = sourceHandler.getGroupId(); |
| | | List<Short> expectedServers = new ArrayList<Short>(); |
| | | List<Short> wrongStatusServers = new ArrayList<Short>(); |
| | | |
| | | if (sourceGroupId != groupId) |
| | | // Assured feature does not cross different group ids |
| | | { |
| | | if (sourceHandler.isLDAPserver()) |
| | | { |
| | | // Look for RS elligible for assured |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | | if (handler.getGroupId() == groupId) |
| | | expectedServers.add(handler.getServerId()); |
| | | } |
| | | } |
| | | |
| | | // Look for DS elligible for assured |
| | | for (ServerHandler handler : directoryServers.values()) |
| | | { |
| | | // Don't forward the change to the server that just sent it |
| | | if (handler == sourceHandler) |
| | | { |
| | | continue; |
| | | } |
| | | if (handler.getGroupId() == groupId) |
| | | { |
| | | if (handler.getStatus() == ServerStatus.NORMAL_STATUS) |
| | | { |
| | | expectedServers.add(handler.getServerId()); |
| | | } else |
| | | { |
| | | wrongStatusServers.add(handler.getServerId()); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Return computed structures |
| | | PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo(); |
| | | if (expectedServers.size() > 0) |
| | | { |
| | | // Some other acks to wait for |
| | | preparedAssuredInfo.expectedAcksInfo = new SafeReadExpectedAcksInfo(cn, |
| | | sourceHandler, expectedServers, wrongStatusServers); |
| | | preparedAssuredInfo.expectedServers = expectedServers; |
| | | } |
| | | |
| | | if (preparedAssuredInfo.expectedServers == null) |
| | | { |
| | | // No elligible servers found, send the ack immediatly |
| | | AckMsg ack = new AckMsg(cn); |
| | | sourceHandler.sendAck(ack); |
| | | } |
| | | |
| | | return preparedAssuredInfo; |
| | | } |
| | | |
| | | /** |
| | | * Process a just received assured update message in Safe Data mode. If the |
| | | * ack can be sent immediately, it is done here. This will also determine to |
| | | * which suitable servers an ack should be requested from, and which ones are |
| | | * not elligible for an ack request. |
| | | * This method is an helper method for the put method. Have a look at the put |
| | | * method for a better understanding. |
| | | * @param update The just received assured update to process. |
| | | * @param sourceHandler The ServerHandler for the server from which the |
| | | * update was received |
| | | * @return A suitable PreparedAssuredInfo object that contains every needed |
| | | * info to proceed with post to server writers. |
| | | * @throws IOException When an IO exception happens during the update |
| | | * processing. |
| | | */ |
| | | private PreparedAssuredInfo processSafeDataUpdateMsg( |
| | | UpdateMsg update, ServerHandler sourceHandler) throws IOException |
| | | { |
| | | ChangeNumber cn = update.getChangeNumber(); |
| | | boolean interestedInAcks = true; |
| | | byte safeDataLevel = update.getSafeDataLevel(); |
| | | byte groupId = replicationServer.getGroupId(); |
| | | byte sourceGroupId = sourceHandler.getGroupId(); |
| | | if (safeDataLevel < (byte) 1) |
| | | { |
| | | // Should never happen |
| | | Message errorMsg = ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL.get( |
| | | Short.toString(replicationServer.getServerId()), |
| | | Byte.toString(safeDataLevel), baseDn, update.toString()); |
| | | logError(errorMsg); |
| | | interestedInAcks = false; |
| | | } else if (sourceGroupId != groupId) |
| | | { |
| | | // Assured feature does not cross different group ids |
| | | interestedInAcks = false; |
| | | } else |
| | | { |
| | | if (sourceHandler.isLDAPserver()) |
| | | { |
| | | if (safeDataLevel == (byte) 1) |
| | | { |
| | | // Immediatly return the ack for an assured message in safe data mode |
| | | // with safe data level 1, coming from a DS. No need to wait for more |
| | | // acks |
| | | AckMsg ack = new AckMsg(cn); |
| | | sourceHandler.sendAck(ack); |
| | | interestedInAcks = false; // No further acks to obtain |
| | | } else |
| | | { |
| | | // level > 1 : We need further acks |
| | | // The message will be posted in assured mode to elligible servers. |
| | | // The embedded safe data level is not changed, and his value will be |
| | | // used by a remote RS to determine if he must send an ack (level > 1) |
| | | // or not (level = 1) |
| | | } |
| | | } else |
| | | { // A RS sent us the safe data message, for sure no futher acks to wait |
| | | interestedInAcks = false; |
| | | if (safeDataLevel == (byte) 1) |
| | | { |
| | | // The original level was 1 so the RS that sent us this message should |
| | | // have already sent his ack to the sender DS. Level 1 has already |
| | | // been reached so no further acks to wait |
| | | // This should not happen in theory as the sender RS server should |
| | | // have sent us a matching not assured message so we should not come |
| | | // to here. |
| | | } else |
| | | { |
| | | // level > 1, so Ack this message to originator RS |
| | | AckMsg ack = new AckMsg(cn); |
| | | sourceHandler.sendAck(ack); |
| | | } |
| | | } |
| | | } |
| | | |
| | | List<Short> expectedServers = new ArrayList<Short>(); |
| | | if (interestedInAcks) |
| | | { |
| | | if (sourceHandler.isLDAPserver()) |
| | | { |
| | | // Look for RS elligible for assured |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | | if (handler.getGroupId() == groupId) |
| | | expectedServers.add(handler.getServerId()); |
| | | } |
| | | } |
| | | |
| | | // Look for DS elligible for assured |
| | | for (ServerHandler handler : directoryServers.values()) |
| | | { |
| | | // Don't forward the change to the server that just sent it |
| | | if (handler == sourceHandler) |
| | | { |
| | | continue; |
| | | } |
| | | if (handler.getGroupId() == groupId) |
| | | expectedServers.add(handler.getServerId()); |
| | | } |
| | | } |
| | | |
| | | // Return computed structures |
| | | PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo(); |
| | | if (interestedInAcks && (expectedServers.size() > 0)) |
| | | { |
| | | // Some other acks to wait for |
| | | preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn, |
| | | sourceHandler, update.getSafeDataLevel()); |
| | | preparedAssuredInfo.expectedServers = expectedServers; |
| | | } |
| | | |
| | | if (interestedInAcks && (preparedAssuredInfo.expectedServers == null)) |
| | | { |
| | | // level > 1 and source is a DS but no elligible servers found, send the |
| | | // ack immediatly |
| | | AckMsg ack = new AckMsg(cn); |
| | | sourceHandler.sendAck(ack); |
| | | } |
| | | |
| | | return preparedAssuredInfo; |
| | | } |
| | | |
| | | /** |
| | | * Process an ack received from a given server. |
| | | * |
| | | * @param ack The ack message received. |
| | | * @param ackingServer The server handler of the server that sent the ack. |
| | | */ |
| | | public void processAck(AckMsg ack, ServerHandler ackingServer) |
| | | { |
| | | // Retrieve the expected acks info for the update matching the original |
| | | // sent update. |
| | | ChangeNumber cn = ack.getChangeNumber(); |
| | | ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn); |
| | | |
| | | if (expectedAcksInfo != null) |
| | | { |
| | | // Prevent concurrent access from processAck() or AssuredTimeoutTask.run() |
| | | synchronized (expectedAcksInfo) |
| | | { |
| | | if (expectedAcksInfo.isCompleted()) |
| | | { |
| | | // Timeout code is sending a timeout ack, do nothing and let him |
| | | // remove object from the map |
| | | return; |
| | | } |
| | | // If this is the last ack we were waiting from, immediatly create and |
| | | // send the final ack to the original server |
| | | if (expectedAcksInfo.processReceivedAck(ackingServer, ack)) |
| | | { |
| | | // Remove the object from the map as no more needed |
| | | waitingAcks.remove(cn); |
| | | AckMsg finalAck = expectedAcksInfo.createAck(false); |
| | | ServerHandler origServer = expectedAcksInfo.getRequesterServer(); |
| | | try |
| | | { |
| | | origServer.sendAck(finalAck); |
| | | } catch (IOException e) |
| | | { |
| | | /* |
| | | * An error happened trying the send back an ack to the server. |
| | | * Log an error and close the connection to this server. |
| | | */ |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_RS_ERROR_SENDING_ACK.get( |
| | | Short.toString(replicationServer.getServerId()), |
| | | Short.toString(origServer.getServerId()), cn.toString(), baseDn)); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | stopServer(origServer); |
| | | } |
| | | // Mark the ack info object as completed to prevent potential timeout |
| | | // code parallel run |
| | | expectedAcksInfo.completed(); |
| | | } |
| | | } |
| | | } else |
| | | { |
| | | // The timeout occured for the update matching this change number and the |
| | | // ack with timeout error has probably already been sent. |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * The code run when the timeout occurs while waiting for acks of the |
| | | * elligible servers. This basically sends a timeout ack (with any additional |
| | | * error info) to the original server that sent an assured update message. |
| | | */ |
| | | private class AssuredTimeoutTask extends TimerTask |
| | | { |
| | | private ChangeNumber cn = null; |
| | | |
| | | /** |
| | | * Constructor for the timer task. |
| | | * @param cn The changenumber of the assured update we are waiting acks for |
| | | */ |
| | | public AssuredTimeoutTask(ChangeNumber cn) |
| | | { |
| | | this.cn = cn; |
| | | } |
| | | |
| | | /** |
| | | * Run when the assured timeout for an assured update message we are waiting |
| | | * acks for occurs. |
| | | */ |
| | | public void run() |
| | | { |
| | | ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn); |
| | | |
| | | if (expectedAcksInfo != null) |
| | | { |
| | | synchronized (expectedAcksInfo) |
| | | { |
| | | if (expectedAcksInfo.isCompleted()) |
| | | { |
| | | // processAck() code is sending the ack, do nothing and let him |
| | | // remove object from the map |
| | | return; |
| | | } |
| | | // Remove the object from the map as no more needed |
| | | waitingAcks.remove(cn); |
| | | // Create the timeout ack and send him to the server the assured |
| | | // update message came from |
| | | AckMsg finalAck = expectedAcksInfo.createAck(true); |
| | | ServerHandler origServer = expectedAcksInfo.getRequesterServer(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + Short.toString(replicationServer.getServerId()) + |
| | | " for " + baseDn + |
| | | ", sending timeout for assured update with change " + " number " + |
| | | cn.toString() + " to server id " + |
| | | Short.toString(origServer.getServerId())); |
| | | try |
| | | { |
| | | origServer.sendAck(finalAck); |
| | | } catch (IOException e) |
| | | { |
| | | /* |
| | | * An error happened trying the send back an ack to the server. |
| | | * Log an error and close the connection to this server. |
| | | */ |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_RS_ERROR_SENDING_ACK.get( |
| | | Short.toString(replicationServer.getServerId()), |
| | | Short.toString(origServer.getServerId()), cn.toString(), baseDn)); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | stopServer(origServer); |
| | | } |
| | | // Mark the ack info object as completed to prevent potential |
| | | // processAck() code parallel run |
| | | expectedAcksInfo.completed(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | * Get the baseDn. |
| | | * @return Returns the baseDn. |
| | | */ |
| | | public DN getBaseDn() |
| | | public String getBaseDn() |
| | | { |
| | | return baseDn; |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the number of currently connected servers. |
| | | * |
| | | * @return the number of currently connected servers. |
| | | */ |
| | | private int NumServers() |
| | | { |
| | | return replicationServers.size() + directoryServers.size(); |
| | | } |
| | | |
| | | /** |
| | | * Add an ack to the list of ack received for a given change. |
| | | * |
| | | * @param message The ack message received. |
| | | * @param fromServerId The identifier of the server that sent the ack. |
| | | */ |
| | | public void ack(AckMsg message, short fromServerId) |
| | | { |
| | | /* |
| | | * there are 2 possible cases here : |
| | | * - the message that was acked comes from a server to which |
| | | * we are directly connected. |
| | | * In this case, we can find the handler from the directoryServers map |
| | | * - the message that was acked comes from a server to which we are not |
| | | * connected. |
| | | * In this case we need to find the replication server that forwarded |
| | | * the change and send back the ack to this server. |
| | | */ |
| | | ServerHandler handler = directoryServers.get( |
| | | message.getChangeNumber().getServerId()); |
| | | if (handler != null) |
| | | handler.ack(message, fromServerId); |
| | | else |
| | | { |
| | | ServerHandler.ackChangelog(message, fromServerId); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the destination handlers for a routable message. |
| | | * |
| | | * @param msg The message to route. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Send back an ack to the server that sent the change. |
| | | * |
| | | * @param changeNumber The ChangeNumber of the change that must be acked. |
| | | * @param isLDAPserver This boolean indicates if the server that sent the |
| | | * change was an LDAP server or a ReplicationServer. |
| | | */ |
| | | public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver) |
| | | { |
| | | short serverId = changeNumber.getServerId(); |
| | | sendAck(changeNumber, isLDAPserver, serverId); |
| | | } |
| | | |
| | | /** |
| | | * |
| | | * Send back an ack to a server that sent the change. |
| | | * |
| | | * @param changeNumber The ChangeNumber of the change that must be acked. |
| | | * @param isLDAPserver This boolean indicates if the server that sent the |
| | | * change was an LDAP server or a ReplicationServer. |
| | | * @param serverId The identifier of the server from which we |
| | | * received the change.. |
| | | */ |
| | | public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver, |
| | | short serverId) |
| | | { |
| | | ServerHandler handler; |
| | | if (isLDAPserver) |
| | | handler = directoryServers.get(serverId); |
| | | else |
| | | handler = replicationServers.get(serverId); |
| | | |
| | | // TODO : check for null handler and log error |
| | | try |
| | | { |
| | | handler.sendAck(changeNumber); |
| | | } catch (IOException e) |
| | | { |
| | | /* |
| | | * An error happened trying the send back an ack to this server. |
| | | * Log an error and close the connection to this server. |
| | | */ |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_ERROR_SENDING_ACK.get(this.toString())); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | stopServer(handler); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this ReplicationServerDomain. |
| | | */ |
| | | public void shutdown() |
| | |
| | | { |
| | | // Put RS info |
| | | rsInfos.add(serverHandler.toRSInfo()); |
| | | // Put his DSs info |
| | | Map<Short, LightweightServerHandler> lsList = |
| | | serverHandler.getConnectedDSs(); |
| | | for (LightweightServerHandler ls : lsList.values()) |
| | | { |
| | | dsInfos.add(ls.toDSInfo()); |
| | | } |
| | | |
| | | serverHandler.addDSInfos(dsInfos); |
| | | } |
| | | |
| | | return new TopologyMsg(dsInfos, rsInfos); |
| | |
| | | if (generationId > 0 && (generationId != handler.getGenerationId())) |
| | | { |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get( |
| | | baseDn.toNormalizedString(), |
| | | baseDn, |
| | | Short.toString(handler.getServerId()), |
| | | Long.toString(handler.getGenerationId()), |
| | | Long.toString(generationId)); |
| | |
| | | synchronized (wrkMonitorData) |
| | | { |
| | | // Here is the RS state : list <serverID, lastChangeNumber> |
| | | // For each LDAP Server, we keep the max CN accross the RSes |
| | | // For each LDAP Server, we keep the max CN across the RSes |
| | | ServerState replServerState = msg.getReplServerDbState(); |
| | | wrkMonitorData.setMaxCNs(replServerState); |
| | | |
| | | // store the remote RS states. |
| | | wrkMonitorData.setRSState(msg.getsenderID(), replServerState); |
| | | |
| | | // Store the remote LDAP servers states |
| | | Iterator<Short> lsidIterator = msg.ldapIterator(); |
| | | while (lsidIterator.hasNext()) |
| | |
| | | } |
| | | } |
| | | } |
| | | |