opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -312,6 +312,26 @@ } /** * Get the largest ChangeNumber. * @return the largest ChangeNumber */ public ChangeNumber getMaxChangeNumber() { ChangeNumber maxCN = null; synchronized (list) { for (int id : list.keySet()) { ChangeNumber tmpMax = list.get(id); if ((maxCN==null) || (tmpMax.newer(maxCN))) maxCN = tmpMax; } } return maxCN; } /** * Add the tail into resultByteArray at position pos. */ private int addByteArray(byte[] tail, byte[] resultByteArray, int pos) opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -571,8 +571,8 @@ catch(Exception e) { if (debugEnabled()) TRACER.debugInfo("In DraftCNDbHandler.getGeneralizedState, read: " + " key=" + key + " genServerState returned is null" + TRACER.debugInfo("In DraftCNDbHandler.getValue, read: " + " key=" + key + " value returned is null" + " first=" + db.readFirstDraftCN() + " last=" + db.readLastDraftCN() + " count=" + db.count() + @@ -586,4 +586,70 @@ } return value; } /** * Get the CN associated to a provided key. * @param key the provided key. * @return the associated CN, null when none. */ public ChangeNumber getChangeNumber(int key) { ChangeNumber cn = null; DraftCNDBCursor draftCNDBCursor = null; try { draftCNDBCursor = db.openReadCursor(key); cn = draftCNDBCursor.currentChangeNumber(); } catch(Exception e) { if (debugEnabled()) TRACER.debugInfo("In DraftCNDbHandler.getChangeNumber, read: " + " key=" + key + " changeNumber returned is null" + " first=" + db.readFirstDraftCN() + " last=" + db.readLastDraftCN() + " count=" + db.count() + " exception" + e + " " + e.getMessage()); return null; } finally { if (draftCNDBCursor != null) draftCNDBCursor.close(); } return cn; } /** * Get the serviceID associated to a provided key. * @param key the provided key. * @return the serviceID, null when none. */ public String getServiceID(int key) { String sid = null; DraftCNDBCursor draftCNDBCursor = null; try { draftCNDBCursor = db.openReadCursor(key); sid = draftCNDBCursor.currentServiceID(); } catch(Exception e) { if (debugEnabled()) TRACER.debugInfo("In DraftCNDbHandler.getServiceID, read: " + " key=" + key + " serviceID returned is null" + " first=" + db.readFirstDraftCN() + " last=" + db.readLastDraftCN() + " count=" + db.count() + " exception" + e + " " + e.getMessage()); return null; } finally { if (draftCNDBCursor != null) draftCNDBCursor.close(); } return sid; } } opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1966,11 +1966,14 @@ int firstDraftCN; int lastDraftCN; boolean DraftCNdbIsEmpty; Long newestDate = 0L; DraftCNDbHandler draftCNDbH = this.getDraftCNDbHandler(); // Get the first DraftCN from the DraftCNdb firstDraftCN = draftCNDbH.getFirstKey(); HashMap<String,ServerState> domainsServerStateForLastSeqnum = null; ChangeNumber changeNumberForLastSeqnum = null; String domainForLastSeqnum = null; if (firstDraftCN < 1) { DraftCNdbIsEmpty=true; @@ -1992,6 +1995,12 @@ domainsServerStateForLastSeqnum = MultiDomainServerState. splitGenStateToServerStates(lastSeqnumGenState); } // Get the changeNumber associated with the current last DraftCN changeNumberForLastSeqnum = draftCNDbH.getChangeNumber(lastDraftCN); // Get the domain associated with the current last DraftCN domainForLastSeqnum = draftCNDbH.getServiceID(lastDraftCN); } // Domain by domain @@ -2009,29 +2018,33 @@ // for this domain, have the state in the replchangelog // where the last DraftCN update is long ec =0; ServerState domainServerStateForLastSeqnum; if ((domainsServerStateForLastSeqnum == null) || (domainsServerStateForLastSeqnum.get(rsd.getBaseDn())==null)) if (domainsServerStateForLastSeqnum == null) { domainServerStateForLastSeqnum = new ServerState(); // Count changes of this domain from the beginning of the changelog ec = rsd.getEligibleCount( new ServerState(), crossDomainEligibleCN); } else { domainServerStateForLastSeqnum = domainsServerStateForLastSeqnum.get(rsd.getBaseDn()); ec--; // There are records in the draftDB (so already returned to clients) // BUT // There is nothing related to this domain in the last draft record // (may be this domain was disabled when this record was returned). // In that case, are counted the changes from // the date of the most recent change from this last draft record if (newestDate == 0L) { newestDate = changeNumberForLastSeqnum.getTime(); } // And count changes of this domain from the date of the // lastseqnum record (that does not refer to this domain) ec = rsd.getEligibleCount(newestDate, crossDomainEligibleCN); if (domainForLastSeqnum.equalsIgnoreCase(rsd.getBaseDn())) ec--; } // Count the number of (eligible) changes from this place // to the eligible CN (cross server) ec = rsd.getEligibleCount( domainServerStateForLastSeqnum, crossDomainEligibleCN); // the state from which we started is the one BEFORE the lastdraftCN // so we must decrement 1 to the EligibleCount if ((ec>0) && (DraftCNdbIsEmpty==false)) ec--; // cumulates on domains lastDraftCN += ec; opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -3467,6 +3467,33 @@ } /** * This methods count the changes, server by server : * - from a start time * - to (inclusive) an end point (the provided endCN). * @param startTime The provided start time. * @param endCN The provided end change number. * @return The number of changes between startTime and endCN. */ public long getEligibleCount(long startTime, ChangeNumber endCN) { long sidRes = 0; long res = 0; // Parses the dbState of the domain , server by server ServerState dbState = this.getDbServerState(); Iterator<Integer> serverIDIterator = dbState.iterator(); while (serverIDIterator.hasNext()) { // process one sid int sid = serverIDIterator.next(); ChangeNumber startCN = new ChangeNumber(startTime, 0, sid); sidRes += getCount(sid, startCN, endCN); res+=sidRes; } return res; } /** * Get the latest (more recent) trim date of the changelog dbs associated * to this domain. * @return The latest trim date.