| | |
| | | import java.util.concurrent.ConcurrentMap; |
| | | |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | |
| | | |
| | | /** |
| | | * |
| | | * - For each server, the max (most recent) CN produced |
| | | * - For each server, the max (most recent) CSN produced |
| | | * |
| | | * - For each server, its state i.e. the last processed from of each |
| | | * other LDAP server. |
| | |
| | | private ConcurrentMap<Integer, ServerState> rsStates = |
| | | new ConcurrentHashMap<Integer, ServerState>(); |
| | | |
| | | /** For each LDAP server, the last(max) CN it published. */ |
| | | private ConcurrentMap<Integer, ChangeNumber> maxCNs = |
| | | new ConcurrentHashMap<Integer, ChangeNumber>(); |
| | | /** For each LDAP server, the last(max) CSN it published. */ |
| | | private ConcurrentMap<Integer, CSN> maxCSNs = |
| | | new ConcurrentHashMap<Integer, CSN>(); |
| | | |
| | | /** |
| | | * For each LDAP server, an approximation of the date of the first missing |
| | |
| | | |
| | | long lsiMissingChanges = 0; |
| | | if (lsiState != null) { |
| | | for (Entry<Integer, ChangeNumber> entry2 : maxCNs.entrySet()) |
| | | for (Entry<Integer, CSN> entry2 : maxCSNs.entrySet()) |
| | | { |
| | | final Integer lsjServerId = entry2.getKey(); |
| | | final ChangeNumber lsjMaxCN = entry2.getValue(); |
| | | ChangeNumber lsiLastCN = lsiState.getChangeNumber(lsjServerId); |
| | | final CSN lsjMaxCSN = entry2.getValue(); |
| | | CSN lsiLastCSN = lsiState.getCSN(lsjServerId); |
| | | |
| | | int missingChangesLsiLsj = |
| | | ChangeNumber.diffSeqNum(lsjMaxCN, lsiLastCN); |
| | | int missingChangesLsiLsj = CSN.diffSeqNum(lsjMaxCSN, lsiLastCSN); |
| | | |
| | | if (debugEnabled()) { |
| | | mds += "+ diff(" + lsjMaxCN + "-" |
| | | + lsiLastCN + ")=" + missingChangesLsiLsj; |
| | | mds += "+ diff(" + lsjMaxCSN + "-" |
| | | + lsiLastCSN + ")=" + missingChangesLsiLsj; |
| | | } |
| | | /* |
| | | Regarding a DS that is generating changes. If it is a local DS1, |
| | | we get its server state, store it, then retrieve server states of |
| | | remote DSs. When a remote server state is coming, it may contain |
| | | a change number for DS1 which is newer than the one we locally |
| | | a CSN for DS1 which is newer than the one we locally |
| | | stored in the server state of DS1. To prevent seeing DS1 has |
| | | missing changes whereas it is wrong, we replace the value with 0 |
| | | if it is a low value. We cannot overwrite big values as they may be |
| | |
| | | long lsiMissingChanges = 0; |
| | | if (lsiState != null) |
| | | { |
| | | for (Entry<Integer, ChangeNumber> entry2 : maxCNs.entrySet()) |
| | | for (Entry<Integer, CSN> entry2 : maxCSNs.entrySet()) |
| | | { |
| | | final Integer lsjServerId = entry2.getKey(); |
| | | final ChangeNumber lsjMaxCN = entry2.getValue(); |
| | | ChangeNumber lsiLastCN = lsiState.getChangeNumber(lsjServerId); |
| | | final CSN lsjMaxCSN = entry2.getValue(); |
| | | CSN lsiLastCSN = lsiState.getCSN(lsjServerId); |
| | | |
| | | int missingChangesLsiLsj = |
| | | ChangeNumber.diffSeqNum(lsjMaxCN, lsiLastCN); |
| | | int missingChangesLsiLsj = CSN.diffSeqNum(lsjMaxCSN, lsiLastCSN); |
| | | |
| | | if (debugEnabled()) { |
| | | mds += "+ diff(" + lsjMaxCN + "-" |
| | | + lsiLastCN + ")=" + missingChangesLsiLsj; |
| | | mds += "+ diff(" + lsjMaxCSN + "-" |
| | | + lsiLastCSN + ")=" + missingChangesLsiLsj; |
| | | } |
| | | lsiMissingChanges += missingChangesLsiLsj; |
| | | } |
| | |
| | | { |
| | | mds += "=" + lsiMissingChanges; |
| | | } |
| | | this.missingChangesRS.put(lsiServerId,lsiMissingChanges); |
| | | this.missingChangesRS.put(lsiServerId, lsiMissingChanges); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | |
| | | { |
| | | String mds = "Monitor data=\n"; |
| | | |
| | | // maxCNs |
| | | for (Entry<Integer, ChangeNumber> entry : maxCNs.entrySet()) |
| | | // maxCSNs |
| | | for (Entry<Integer, CSN> entry : maxCSNs.entrySet()) |
| | | { |
| | | final Integer serverId = entry.getKey(); |
| | | final ChangeNumber cn = entry.getValue(); |
| | | mds += "\nmaxCNs(" + serverId + ")= " + cn.toStringUI(); |
| | | final CSN csn = entry.getValue(); |
| | | mds += "\nmaxCSNs(" + serverId + ")= " + csn.toStringUI(); |
| | | } |
| | | |
| | | // LDAP data |
| | |
| | | } |
| | | |
| | | /** |
| | | * From a provided state, sets the max CN of the monitor data. |
| | | * From a provided state, sets the max CSN of the monitor data. |
| | | * @param state the provided state. |
| | | */ |
| | | public void setMaxCNs(ServerState state) |
| | | public void setMaxCSNs(ServerState state) |
| | | { |
| | | for (Integer serverId : state) { |
| | | ChangeNumber newCN = state.getChangeNumber(serverId); |
| | | setMaxCN(serverId, newCN); |
| | | CSN newCSN = state.getCSN(serverId); |
| | | setMaxCSN(serverId, newCSN); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * For the provided serverId, sets the provided CN as the max if |
| | | * For the provided serverId, sets the provided CSN as the max if |
| | | * it is newer than the current max. |
| | | * @param serverId the provided serverId |
| | | * @param newCN the provided new CN |
| | | * @param newCSN the provided new CSN |
| | | */ |
| | | public void setMaxCN(int serverId, ChangeNumber newCN) |
| | | public void setMaxCSN(int serverId, CSN newCSN) |
| | | { |
| | | if (newCN==null) return; |
| | | if (newCSN==null) return; |
| | | |
| | | ChangeNumber currentMaxCN = maxCNs.get(serverId); |
| | | if (currentMaxCN == null) |
| | | CSN currentMaxCSN = maxCSNs.get(serverId); |
| | | if (currentMaxCSN == null) |
| | | { |
| | | maxCNs.put(serverId, newCN); |
| | | maxCSNs.put(serverId, newCSN); |
| | | } |
| | | else if (newCN.newer(currentMaxCN)) |
| | | else if (newCSN.newer(currentMaxCSN)) |
| | | { |
| | | maxCNs.replace(serverId, newCN); |
| | | maxCSNs.replace(serverId, newCSN); |
| | | } |
| | | } |
| | | |