| | |
| | | * |
| | | * |
| | | * Copyright 2009-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2012-2014 ForgeRock AS |
| | | * Portions Copyright 2012-2015 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | |
| | | * date of the first missing change. |
| | | */ |
| | | |
| | | |
| | | /** For each LDAP server, its server state. */ |
| | | private ConcurrentMap<Integer, ServerState> ldapStates = |
| | | new ConcurrentHashMap<Integer, ServerState>(); |
| | | |
| | | private ConcurrentMap<Integer, ServerState> ldapStates = new ConcurrentHashMap<>(); |
| | | /** A Map containing the ServerStates of each RS. */ |
| | | private ConcurrentMap<Integer, ServerState> rsStates = |
| | | new ConcurrentHashMap<Integer, ServerState>(); |
| | | |
| | | private ConcurrentMap<Integer, ServerState> rsStates = new ConcurrentHashMap<>(); |
| | | /** For each LDAP server, the last(max) CSN it published. */ |
| | | private ConcurrentMap<Integer, CSN> maxCSNs = |
| | | new ConcurrentHashMap<Integer, CSN>(); |
| | | private ConcurrentMap<Integer, CSN> maxCSNs = new ConcurrentHashMap<>(); |
| | | |
| | | /** |
| | | * For each LDAP server, an approximation of the date of the first missing |
| | | * change. |
| | | */ |
| | | private ConcurrentMap<Integer, Long> firstMissingDates = |
| | | new ConcurrentHashMap<Integer, Long>(); |
| | | |
| | | private ConcurrentMap<Integer, Long> missingChanges = |
| | | new ConcurrentHashMap<Integer, Long>(); |
| | | |
| | | private ConcurrentMap<Integer, Long> missingChangesRS = |
| | | new ConcurrentHashMap<Integer, Long>(); |
| | | |
| | | /** For each LDAP server, an approximation of the date of the first missing change. */ |
| | | private ConcurrentMap<Integer, Long> firstMissingDates = new ConcurrentHashMap<>(); |
| | | private ConcurrentMap<Integer, Long> missingChanges = new ConcurrentHashMap<>(); |
| | | private ConcurrentMap<Integer, Long> missingChangesRS = new ConcurrentHashMap<>(); |
| | | |
| | | /** |
| | | * Get an approximation of the latency delay of the replication. |
| | |
| | | public long getApproxDelay(int serverId) |
| | | { |
| | | Long afmd = firstMissingDates.get(serverId); |
| | | if (afmd != null && afmd > 0) |
| | | if (afmd != null && afmd > 0) { |
| | | return (TimeThread.getTime() - afmd) / 1000; |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | |
| | | */ |
| | | public long getApproxFirstMissingDate(int serverId) |
| | | { |
| | | Long res = firstMissingDates.get(serverId); |
| | | if (res != null) |
| | | return res; |
| | | return 0; |
| | | return getValueOrZero(firstMissingDates.get(serverId)); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public long getMissingChanges(int serverId) |
| | | { |
| | | Long res = missingChanges.get(serverId); |
| | | if (res != null) |
| | | return res; |
| | | return 0; |
| | | return getValueOrZero(missingChanges.get(serverId)); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public long getMissingChangesRS(int serverId) |
| | | { |
| | | Long res = missingChangesRS.get(serverId); |
| | | if (res != null) |
| | | return res; |
| | | return 0; |
| | | return getValueOrZero(missingChangesRS.get(serverId)); |
| | | } |
| | | |
| | | private long getValueOrZero(Long res) |
| | | { |
| | | return res != null ? res : 0; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void completeComputing() |
| | | { |
| | | String mds = ""; |
| | | StringBuilder mds = new StringBuilder(); |
| | | |
| | | // Computes the missing changes counters for LDAP servers |
| | | // For each LSi , |
| | |
| | | { |
| | | final Integer lsiServerId = entry.getKey(); |
| | | final ServerState lsiState = entry.getValue(); |
| | | |
| | | long lsiMissingChanges = 0; |
| | | if (lsiState != null) { |
| | | for (Entry<Integer, CSN> entry2 : maxCSNs.entrySet()) |
| | | { |
| | | final Integer lsjServerId = entry2.getKey(); |
| | | final CSN lsjMaxCSN = entry2.getValue(); |
| | | CSN lsiLastCSN = lsiState.getCSN(lsjServerId); |
| | | |
| | | int missingChangesLsiLsj = CSN.diffSeqNum(lsjMaxCSN, lsiLastCSN); |
| | | |
| | | if (logger.isTraceEnabled()) { |
| | | mds += "+ diff(" + lsjMaxCSN + "-" |
| | | + lsiLastCSN + ")=" + missingChangesLsiLsj; |
| | | } |
| | | /* |
| | | Regarding a DS that is generating changes. If it is a local DS1, |
| | | we get its server state, store it, then retrieve server states of |
| | | remote DSs. When a remote server state is coming, it may contain |
| | | a CSN for DS1 which is newer than the one we locally |
| | | stored in the server state of DS1. To prevent seeing DS1 has |
| | | missing changes whereas it is wrong, we replace the value with 0 |
| | | if it is a low value. We cannot overwrite big values as they may be |
| | | useful for a local server retrieving changes it generated earlier, |
| | | when it is recovering from an old snapshot and the local RS is |
| | | sending him the changes it is missing. |
| | | */ |
| | | if (lsjServerId.equals(lsiServerId) && missingChangesLsiLsj <= 50) |
| | | { |
| | | missingChangesLsiLsj = 0; |
| | | if (logger.isTraceEnabled()) { |
| | | mds += |
| | | " (diff replaced by 0 as for server id " + lsiServerId + ")"; |
| | | } |
| | | } |
| | | |
| | | lsiMissingChanges += missingChangesLsiLsj; |
| | | } |
| | | } |
| | | long lsiMissingChanges = computeMissingChanges(mds, lsiServerId, lsiState); |
| | | if (logger.isTraceEnabled()) { |
| | | mds += "=" + lsiMissingChanges; |
| | | mds.append("=" + lsiMissingChanges); |
| | | } |
| | | |
| | | this.missingChanges.put(lsiServerId, lsiMissingChanges); |
| | | } |
| | | |
| | |
| | | { |
| | | final Integer lsiServerId = entry.getKey(); |
| | | final ServerState lsiState = entry.getValue(); |
| | | |
| | | long lsiMissingChanges = 0; |
| | | if (lsiState != null) |
| | | { |
| | | for (Entry<Integer, CSN> entry2 : maxCSNs.entrySet()) |
| | | { |
| | | final Integer lsjServerId = entry2.getKey(); |
| | | final CSN lsjMaxCSN = entry2.getValue(); |
| | | CSN lsiLastCSN = lsiState.getCSN(lsjServerId); |
| | | |
| | | int missingChangesLsiLsj = CSN.diffSeqNum(lsjMaxCSN, lsiLastCSN); |
| | | |
| | | if (logger.isTraceEnabled()) { |
| | | mds += "+ diff(" + lsjMaxCSN + "-" |
| | | + lsiLastCSN + ")=" + missingChangesLsiLsj; |
| | | } |
| | | lsiMissingChanges += missingChangesLsiLsj; |
| | | } |
| | | long lsiMissingChanges = computeMissingChanges(mds, Integer.MIN_VALUE, lsiState); |
| | | if (logger.isTraceEnabled()) { |
| | | mds.append("=" + lsiMissingChanges); |
| | | } |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | mds += "=" + lsiMissingChanges; |
| | | } |
| | | |
| | | this.missingChangesRS.put(lsiServerId, lsiMissingChanges); |
| | | |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | logger.trace( |
| | | "Complete monitor data : Missing changes ("+ lsiServerId +")=" + mds); |
| | | logger.trace("Complete monitor data : Missing changes (" + lsiServerId + ")=" + mds); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private long computeMissingChanges(StringBuilder mds, final Integer lsiServerId, final ServerState lsiState) |
| | | { |
| | | long lsiMissingChanges = 0; |
| | | if (lsiState != null) { |
| | | for (Entry<Integer, CSN> entry2 : maxCSNs.entrySet()) |
| | | { |
| | | final Integer lsjServerId = entry2.getKey(); |
| | | final CSN lsjMaxCSN = entry2.getValue(); |
| | | CSN lsiLastCSN = lsiState.getCSN(lsjServerId); |
| | | |
| | | int missingChangesLsiLsj = CSN.diffSeqNum(lsjMaxCSN, lsiLastCSN); |
| | | |
| | | if (logger.isTraceEnabled()) { |
| | | mds.append("+ diff(" + lsjMaxCSN + "-" + lsiLastCSN + ")=" + missingChangesLsiLsj); |
| | | } |
| | | /* |
| | | THIS BIT OF CODE IS IRRELEVANT TO RSs. |
| | | Regarding a DS that is generating changes. If it is a local DS1, |
| | | we get its server state, store it, then retrieve server states of |
| | | remote DSs. When a remote server state is coming, it may contain |
| | | a CSN for DS1 which is newer than the one we locally |
| | | stored in the server state of DS1. To prevent seeing DS1 has |
| | | missing changes whereas it is wrong, we replace the value with 0 |
| | | if it is a low value. We cannot overwrite big values as they may be |
| | | useful for a local server retrieving changes it generated earlier, |
| | | when it is recovering from an old snapshot and the local RS is |
| | | sending him the changes it is missing. |
| | | */ |
| | | if (lsjServerId.equals(lsiServerId) && missingChangesLsiLsj <= 50) |
| | | { |
| | | missingChangesLsiLsj = 0; |
| | | if (logger.isTraceEnabled()) { |
| | | mds.append(" (diff replaced by 0 as for server id " + lsiServerId + ")"); |
| | | } |
| | | } |
| | | |
| | | lsiMissingChanges += missingChangesLsiLsj; |
| | | } |
| | | } |
| | | return lsiMissingChanges; |
| | | } |
| | | |
| | | /** |
| | | * Returns a <code>String</code> object representing this |
| | | * object's value. |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | String mds = "Monitor data=\n"; |
| | | StringBuilder mds = new StringBuilder("Monitor data=\n"); |
| | | |
| | | // maxCSNs |
| | | for (Entry<Integer, CSN> entry : maxCSNs.entrySet()) |
| | | { |
| | | final Integer serverId = entry.getKey(); |
| | | final CSN csn = entry.getValue(); |
| | | mds += "\nmaxCSNs(" + serverId + ")= " + csn.toStringUI(); |
| | | mds.append("\nmaxCSNs(" + serverId + ")= " + csn.toStringUI()); |
| | | } |
| | | |
| | | // LDAP data |
| | |
| | | { |
| | | final Integer serverId = entry.getKey(); |
| | | final ServerState ss = entry.getValue(); |
| | | mds += "\nLSData(" + serverId + ")=\t" |
| | | mds.append("\nLSData(" + serverId + ")=\t" |
| | | + "state=[" + ss + "] afmd=" + getApproxFirstMissingDate(serverId) |
| | | + " missingDelay=" + getApproxDelay(serverId) |
| | | + " missingCount=" + missingChanges.get(serverId); |
| | | + " missingCount=" + missingChanges.get(serverId)); |
| | | } |
| | | |
| | | // RS data |
| | |
| | | { |
| | | final Integer serverId = entry.getKey(); |
| | | final ServerState ss = entry.getValue(); |
| | | mds += "\nRSData(" + serverId + ")=\t" + "state=[" + ss |
| | | + "] missingCount=" + missingChangesRS.get(serverId); |
| | | mds.append("\nRSData(" + serverId + ")=\t" + "state=[" + ss |
| | | + "] missingCount=" + missingChangesRS.get(serverId)); |
| | | } |
| | | |
| | | mds += "\n--"; |
| | | return mds; |
| | | mds.append("\n--"); |
| | | return mds.toString(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void setMaxCSN(CSN newCSN) |
| | | { |
| | | if (newCSN == null) return; |
| | | if (newCSN == null) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | int serverId = newCSN.getServerId(); |
| | | CSN currentMaxCSN = maxCSNs.get(serverId); |
| | | if (currentMaxCSN == null) |
| | |
| | | } |
| | | else if (newCSN.isNewerThan(currentMaxCSN)) |
| | | { |
| | | // TODO JNR should we check for unsuccessful replace? |
| | | maxCSNs.replace(serverId, newCSN); |
| | | } |
| | | } |