| | |
| | | * The needed info for each received assured update message we are waiting |
| | | * acks for. |
| | | * <p> |
| | | * Key: a change number matching a received update message which requested |
| | | * Key: a CSN matching a received update message which requested |
| | | * assured mode usage (either safe read or safe data mode) |
| | | * <p> |
| | | * Value: The object holding every info needed about the already received acks |
| | |
| | | * @see ExpectedAcksInfo For more details, see ExpectedAcksInfo and its sub |
| | | * classes javadoc. |
| | | */ |
| | | private final ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo> waitingAcks = |
| | | new ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo>(); |
| | | private final Map<CSN, ExpectedAcksInfo> waitingAcks = |
| | | new ConcurrentHashMap<CSN, ExpectedAcksInfo>(); |
| | | |
| | | /** |
| | | * The timer used to run the timeout code (timer tasks) for the assured update |
| | |
| | | public void put(UpdateMsg update, ServerHandler sourceHandler) |
| | | throws IOException |
| | | { |
| | | ChangeNumber cn = update.getChangeNumber(); |
| | | int serverId = cn.getServerId(); |
| | | CSN csn = update.getCSN(); |
| | | int serverId = csn.getServerId(); |
| | | |
| | | sourceHandler.updateServerState(update); |
| | | sourceHandler.incrementInCount(); |
| | |
| | | // The following timer will time out and send an timeout ack to the |
| | | // requester if the acks are not received in time. The timer will also |
| | | // remove the object from this map. |
| | | waitingAcks.put(cn, preparedAssuredInfo.expectedAcksInfo); |
| | | waitingAcks.put(csn, preparedAssuredInfo.expectedAcksInfo); |
| | | |
| | | // Arm timer for this assured update message (wait for acks until it |
| | | // times out) |
| | | AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(cn); |
| | | AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(csn); |
| | | assuredTimeoutTimer.schedule(assuredTimeoutTask, |
| | | localReplicationServer.getAssuredTimeout()); |
| | | // Purge timer every 100 treated messages |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debug("update " + update.getChangeNumber() |
| | | debug("update " + update.getCSN() |
| | | + " will not be sent to replication server " |
| | | + rsHandler.getServerId() + " with generation id " |
| | | + rsHandler.getGenerationId() + " different from local " |
| | |
| | | { |
| | | if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) |
| | | { |
| | | debug("update " + update.getChangeNumber() |
| | | debug("update " + update.getCSN() |
| | | + " will not be sent to directory server " |
| | | + dsHandler.getServerId() + " with generation id " |
| | | + dsHandler.getGenerationId() + " different from local " |
| | |
| | | } |
| | | if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | | { |
| | | debug("update " + update.getChangeNumber() |
| | | debug("update " + update.getCSN() |
| | | + " will not be sent to directory server " |
| | | + dsHandler.getServerId() + " as it is in full update"); |
| | | } |
| | |
| | | private PreparedAssuredInfo processSafeReadUpdateMsg( |
| | | UpdateMsg update, ServerHandler sourceHandler) throws IOException |
| | | { |
| | | ChangeNumber cn = update.getChangeNumber(); |
| | | CSN csn = update.getCSN(); |
| | | byte groupId = localReplicationServer.getGroupId(); |
| | | byte sourceGroupId = sourceHandler.getGroupId(); |
| | | List<Integer> expectedServers = new ArrayList<Integer>(); |
| | |
| | | if (expectedServers.size() > 0) |
| | | { |
| | | // Some other acks to wait for |
| | | preparedAssuredInfo.expectedAcksInfo = new SafeReadExpectedAcksInfo(cn, |
| | | preparedAssuredInfo.expectedAcksInfo = new SafeReadExpectedAcksInfo(csn, |
| | | sourceHandler, expectedServers, wrongStatusServers); |
| | | preparedAssuredInfo.expectedServers = expectedServers; |
| | | } |
| | |
| | | if (preparedAssuredInfo.expectedServers == null) |
| | | { |
| | | // No eligible servers found, send the ack immediately |
| | | sourceHandler.send(new AckMsg(cn)); |
| | | sourceHandler.send(new AckMsg(csn)); |
| | | } |
| | | |
| | | return preparedAssuredInfo; |
| | |
| | | private PreparedAssuredInfo processSafeDataUpdateMsg( |
| | | UpdateMsg update, ServerHandler sourceHandler) throws IOException |
| | | { |
| | | ChangeNumber cn = update.getChangeNumber(); |
| | | CSN csn = update.getCSN(); |
| | | boolean interestedInAcks = false; |
| | | byte safeDataLevel = update.getSafeDataLevel(); |
| | | byte groupId = localReplicationServer.getGroupId(); |
| | |
| | | * mode with safe data level 1, coming from a DS. No need to wait |
| | | * for more acks |
| | | */ |
| | | sourceHandler.send(new AckMsg(cn)); |
| | | sourceHandler.send(new AckMsg(csn)); |
| | | } else |
| | | { |
| | | /** |
| | |
| | | */ |
| | | if (safeDataLevel > (byte) 1) |
| | | { |
| | | sourceHandler.send(new AckMsg(cn)); |
| | | sourceHandler.send(new AckMsg(csn)); |
| | | } |
| | | } |
| | | } |
| | |
| | | byte finalSdl = (nExpectedServers >= neededAdditionalServers) ? |
| | | (byte)sdl : // Keep level as it was |
| | | (byte)(nExpectedServers+1); // Change level to match what's available |
| | | preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn, |
| | | preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(csn, |
| | | sourceHandler, finalSdl, expectedServers); |
| | | preparedAssuredInfo.expectedServers = expectedServers; |
| | | } else |
| | | { |
| | | // level > 1 and source is a DS but no eligible servers found, send the |
| | | // ack immediately |
| | | sourceHandler.send(new AckMsg(cn)); |
| | | sourceHandler.send(new AckMsg(csn)); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | // Retrieve the expected acks info for the update matching the original |
| | | // sent update. |
| | | ChangeNumber cn = ack.getChangeNumber(); |
| | | ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn); |
| | | CSN csn = ack.getCSN(); |
| | | ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(csn); |
| | | |
| | | if (expectedAcksInfo != null) |
| | | { |
| | |
| | | if (expectedAcksInfo.processReceivedAck(ackingServer, ack)) |
| | | { |
| | | // Remove the object from the map as no more needed |
| | | waitingAcks.remove(cn); |
| | | waitingAcks.remove(csn); |
| | | AckMsg finalAck = expectedAcksInfo.createAck(false); |
| | | ServerHandler origServer = expectedAcksInfo.getRequesterServer(); |
| | | try |
| | |
| | | mb.append(ERR_RS_ERROR_SENDING_ACK.get( |
| | | Integer.toString(localReplicationServer.getServerId()), |
| | | Integer.toString(origServer.getServerId()), |
| | | cn.toString(), baseDn)); |
| | | csn.toString(), baseDn)); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | stopServer(origServer, false); |
| | |
| | | } |
| | | } |
| | | } |
| | | /* Else the timeout occurred for the update matching this change number |
| | | /* Else the timeout occurred for the update matching this CSN |
| | | * and the ack with timeout error has probably already been sent. |
| | | */ |
| | | } |
| | |
| | | */ |
| | | private class AssuredTimeoutTask extends TimerTask |
| | | { |
| | | private ChangeNumber cn = null; |
| | | private CSN csn = null; |
| | | |
| | | /** |
| | | * Constructor for the timer task. |
| | | * @param cn The changeNumber of the assured update we are waiting acks for |
| | | * @param csn The CSN of the assured update we are waiting acks for |
| | | */ |
| | | public AssuredTimeoutTask(ChangeNumber cn) |
| | | public AssuredTimeoutTask(CSN csn) |
| | | { |
| | | this.cn = cn; |
| | | this.csn = csn; |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public void run() |
| | | { |
| | | ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn); |
| | | ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(csn); |
| | | |
| | | if (expectedAcksInfo != null) |
| | | { |
| | |
| | | return; |
| | | } |
| | | // Remove the object from the map as no more needed |
| | | waitingAcks.remove(cn); |
| | | waitingAcks.remove(csn); |
| | | // Create the timeout ack and send him to the server the assured |
| | | // update message came from |
| | | AckMsg finalAck = expectedAcksInfo.createAck(true); |
| | | ServerHandler origServer = expectedAcksInfo.getRequesterServer(); |
| | | if (debugEnabled()) |
| | | { |
| | | debug("sending timeout for assured update with change number " + cn |
| | | debug("sending timeout for assured update with CSN " + csn |
| | | + " to serverId=" + origServer.getServerId()); |
| | | } |
| | | try |
| | |
| | | mb.append(ERR_RS_ERROR_SENDING_ACK.get( |
| | | Integer.toString(localReplicationServer.getServerId()), |
| | | Integer.toString(origServer.getServerId()), |
| | | cn.toString(), baseDn)); |
| | | csn.toString(), baseDn)); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | stopServer(origServer, false); |
| | |
| | | * |
| | | * @param serverId |
| | | * Identifier of the server for which the cursor is created. |
| | | * @param startAfterCN |
| | | * @param startAfterCSN |
| | | * Starting point for the cursor. |
| | | * @return the created {@link ReplicaDBCursor}. Null when no DB is |
| | | * available or the DB is empty for the provided serverId . |
| | | * @return the created {@link ReplicaDBCursor}. Null when no DB is available |
| | | * or the DB is empty for the provided serverId . |
| | | */ |
| | | public ReplicaDBCursor getCursorFrom(int serverId, |
| | | ChangeNumber startAfterCN) |
| | | public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN) |
| | | { |
| | | DbHandler dbHandler = sourceDbHandlers.get(serverId); |
| | | if (dbHandler == null) |
| | |
| | | ReplicaDBCursor cursor; |
| | | try |
| | | { |
| | | cursor = dbHandler.generateCursorFrom(startAfterCN); |
| | | cursor = dbHandler.generateCursorFrom(startAfterCSN); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | |
| | | /** |
| | | * Count the number of changes in the replication changelog for the provided |
| | | * serverID, between 2 provided changenumbers. |
| | | * serverID, between 2 provided CSNs. |
| | | * @param serverId Identifier of the server for which to compute the count. |
| | | * @param from lower limit changenumber. |
| | | * @param to upper limit changenumber. |
| | | * @param from lower limit CSN. |
| | | * @param to upper limit CSN. |
| | | * @return the number of changes. |
| | | */ |
| | | public long getCount(int serverId, ChangeNumber from, ChangeNumber to) |
| | | public long getCount(int serverId, CSN from, CSN to) |
| | | { |
| | | DbHandler dbHandler = sourceDbHandlers.get(serverId); |
| | | if (dbHandler != null) |
| | |
| | | * <pre> |
| | | * s1 s2 s3 |
| | | * -- -- -- |
| | | * cn31 |
| | | * cn15 |
| | | * csn31 |
| | | * csn15 |
| | | * |
| | | * ----------------------------------------- eligibleCN |
| | | * cn14 |
| | | * cn26 |
| | | * cn13 |
| | | * ----------------------------------------- eligibleCSN |
| | | * csn14 |
| | | * csn26 |
| | | * csn13 |
| | | * </pre> |
| | | * |
| | | * The eligibleState is : s1;cn14 / s2;cn26 / s3;cn31 |
| | | * The eligibleState is : s1;csn14 / s2;csn26 / s3;csn31 |
| | | * |
| | | * @param eligibleCN The provided eligibleCN. |
| | | * @param eligibleCSN |
| | | * The provided eligible CSN. |
| | | * @return The computed eligible server state. |
| | | */ |
| | | public ServerState getEligibleState(ChangeNumber eligibleCN) |
| | | public ServerState getEligibleState(CSN eligibleCSN) |
| | | { |
| | | ServerState dbState = getDbServerState(); |
| | | |
| | | // The result is initialized from the dbState. |
| | | // From it, we don't want to keep the changes newer than eligibleCN. |
| | | // From it, we don't want to keep the changes newer than eligibleCSN. |
| | | ServerState result = dbState.duplicate(); |
| | | |
| | | if (eligibleCN != null) |
| | | if (eligibleCSN != null) |
| | | { |
| | | for (int serverId : dbState) |
| | | { |
| | | DbHandler h = sourceDbHandlers.get(serverId); |
| | | ChangeNumber mostRecentDbCN = dbState.getChangeNumber(serverId); |
| | | CSN mostRecentDbCSN = dbState.getCSN(serverId); |
| | | try { |
| | | // Is the most recent change in the Db newer than eligible CN ? |
| | | // if yes (like cn15 in the example above, then we have to go back |
| | | // to the Db and look for the change older than eligible CN (cn14) |
| | | if (eligibleCN.olderOrEqual(mostRecentDbCN)) { |
| | | // let's try to seek the first change <= eligibleCN |
| | | // Is the most recent change in the Db newer than eligible CSN ? |
| | | // if yes (like csn15 in the example above, then we have to go back |
| | | // to the Db and look for the change older than eligible CSN (csn14) |
| | | if (eligibleCSN.olderOrEqual(mostRecentDbCSN)) |
| | | { |
| | | // let's try to seek the first change <= eligibleCSN |
| | | ReplicaDBCursor cursor = null; |
| | | try { |
| | | cursor = h.generateCursorFrom(eligibleCN); |
| | | cursor = h.generateCursorFrom(eligibleCSN); |
| | | if (cursor != null && cursor.getChange() != null) { |
| | | ChangeNumber newCN = cursor.getChange().getChangeNumber(); |
| | | result.update(newCN); |
| | | CSN newCSN = cursor.getChange().getCSN(); |
| | | result.update(newCSN); |
| | | } |
| | | } catch (ChangelogException e) { |
| | | // there's no change older than eligibleCN (case of s3/cn31) |
| | | result.update(new ChangeNumber(0, 0, serverId)); |
| | | // there's no change older than eligibleCSN (case of s3/csn31) |
| | | result.update(new CSN(0, 0, serverId)); |
| | | } finally { |
| | | close(cursor); |
| | | } |
| | | } else { |
| | | // for this serverId, all changes in the ChangelogDb are holder |
| | | // than eligibleCN , the most recent in the db is our guy. |
| | | result.update(mostRecentDbCN); |
| | | // than eligibleCSN, the most recent in the db is our guy. |
| | | result.update(mostRecentDbCSN); |
| | | } |
| | | } catch (Exception e) { |
| | | logError(ERR_WRITER_UNEXPECTED_EXCEPTION |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns the eligibleCN for that domain - relies on the ChangeTimeHeartbeat |
| | | * state. |
| | | * For each DS, take the oldest CN from the changetime heartbeat state |
| | | * and from the changelog db last CN. Can be null. |
| | | * @return the eligible CN. |
| | | * Returns the eligible CSN for that domain - relies on the |
| | | * ChangeTimeHeartbeat state. |
| | | * <p> |
| | | * For each DS, take the oldest CSN from the changetime heartbeat state and |
| | | * from the changelog db last CSN. Can be null. |
| | | * |
| | | * @return the eligible CSN. |
| | | */ |
| | | public ChangeNumber getEligibleCN() |
| | | public CSN getEligibleCSN() |
| | | { |
| | | ChangeNumber eligibleCN = null; |
| | | CSN eligibleCSN = null; |
| | | |
| | | for (DbHandler db : sourceDbHandlers.values()) |
| | | { |
| | |
| | | int serverId = db.getServerId(); |
| | | |
| | | // Should it be considered for eligibility ? |
| | | ChangeNumber heartbeatLastCN = |
| | | getChangeTimeHeartbeatState().getChangeNumber(serverId); |
| | | CSN heartbeatLastCSN = |
| | | getChangeTimeHeartbeatState().getCSN(serverId); |
| | | |
| | | // If the most recent UpdateMsg or CLHeartbeatMsg received is very old |
| | | // then the domain is considered down and not considered for eligibility |
| | |
| | | continue; |
| | | } |
| | | |
| | | ChangeNumber changelogLastCN = db.getLastChange(); |
| | | if (changelogLastCN != null |
| | | && (eligibleCN == null || changelogLastCN.newer(eligibleCN))) |
| | | CSN changelogLastCSN = db.getLastChange(); |
| | | if (changelogLastCSN != null |
| | | && (eligibleCSN == null || changelogLastCSN.newer(eligibleCSN))) |
| | | { |
| | | eligibleCN = changelogLastCN; |
| | | eligibleCSN = changelogLastCSN; |
| | | } |
| | | if (heartbeatLastCN != null |
| | | && (eligibleCN == null || heartbeatLastCN.newer(eligibleCN))) |
| | | if (heartbeatLastCSN != null |
| | | && (eligibleCSN == null || heartbeatLastCSN.newer(eligibleCSN))) |
| | | { |
| | | eligibleCN = heartbeatLastCN; |
| | | eligibleCSN = heartbeatLastCSN; |
| | | } |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | debug("getEligibleCN() returns result =" + eligibleCN); |
| | | debug("getEligibleCSN() returns result =" + eligibleCSN); |
| | | } |
| | | return eligibleCN; |
| | | return eligibleCSN; |
| | | } |
| | | |
| | | private boolean isServerConnected(int serverId) |
| | |
| | | |
| | | |
| | | /** |
| | | * Processes a ChangeTimeHeartbeatMsg received, by storing the CN (timestamp) |
| | | * Processes a ChangeTimeHeartbeatMsg received, by storing the CSN (timestamp) |
| | | * value received, and forwarding the message to the other RSes. |
| | | * @param senderHandler The handler for the server that sent the heartbeat. |
| | | * @param msg The message to process. |
| | |
| | | |
| | | try |
| | | { |
| | | storeReceivedCTHeartbeat(msg.getChangeNumber()); |
| | | storeReceivedCTHeartbeat(msg.getCSN()); |
| | | if (senderHandler.isDataServer()) |
| | | { |
| | | // If we are the first replication server warned, |
| | |
| | | |
| | | /** |
| | | * Store a change time value received from a data server. |
| | | * @param cn The provided change time. |
| | | * @param csn The provided change time. |
| | | */ |
| | | public void storeReceivedCTHeartbeat(ChangeNumber cn) |
| | | public void storeReceivedCTHeartbeat(CSN csn) |
| | | { |
| | | // TODO:May be we can spare processing by only storing CN (timestamp) |
| | | // TODO:May be we can spare processing by only storing CSN (timestamp) |
| | | // instead of a server state. |
| | | getChangeTimeHeartbeatState().update(cn); |
| | | getChangeTimeHeartbeatState().update(csn); |
| | | } |
| | | |
| | | /** |
| | | * This methods count the changes, server by server : |
| | | * - from a serverState start point |
| | | * - to (inclusive) an end point (the provided endCN). |
| | | * - to (inclusive) an end point (the provided endCSN). |
| | | * @param startState The provided start server state. |
| | | * @param endCN The provided end change number. |
| | | * @return The number of changes between startState and endCN. |
| | | * @param endCSN The provided end CSN. |
| | | * @return The number of changes between startState and endCSN. |
| | | */ |
| | | public long getEligibleCount(ServerState startState, ChangeNumber endCN) |
| | | public long getEligibleCount(ServerState startState, CSN endCSN) |
| | | { |
| | | long res = 0; |
| | | |
| | | for (int serverId : getDbServerState()) |
| | | { |
| | | ChangeNumber startCN = startState.getChangeNumber(serverId); |
| | | long serverIdRes = getCount(serverId, startCN, endCN); |
| | | CSN startCSN = startState.getCSN(serverId); |
| | | long serverIdRes = getCount(serverId, startCSN, endCSN); |
| | | |
| | | // The startPoint is excluded when counting the ECL eligible changes |
| | | if (startCN != null && serverIdRes > 0) |
| | | if (startCSN != null && serverIdRes > 0) |
| | | { |
| | | serverIdRes--; |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * This methods count the changes, server by server : |
| | | * - from a start CN |
| | | * - to (inclusive) an end point (the provided endCN). |
| | | * @param startCN The provided start changeNumber. |
| | | * @param endCN The provided end change number. |
| | | * @return The number of changes between startTime and endCN. |
| | | * This methods count the changes, server by server: |
| | | * - from a start CSN |
| | | * - to (inclusive) an end point (the provided endCSN). |
| | | * @param startCSN The provided start CSN. |
| | | * @param endCSN The provided end CSN. |
| | | * @return The number of changes between startTime and endCSN. |
| | | */ |
| | | public long getEligibleCount(ChangeNumber startCN, ChangeNumber endCN) |
| | | public long getEligibleCount(CSN startCSN, CSN endCSN) |
| | | { |
| | | long res = 0; |
| | | for (int serverId : getDbServerState()) { |
| | | ChangeNumber lStartCN = |
| | | new ChangeNumber(startCN.getTime(), startCN.getSeqnum(), serverId); |
| | | res += getCount(serverId, lStartCN, endCN); |
| | | CSN lStartCSN = |
| | | new CSN(startCSN.getTime(), startCSN.getSeqnum(), serverId); |
| | | res += getCount(serverId, lStartCSN, endCSN); |
| | | } |
| | | return res; |
| | | } |