opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -518,4 +518,31 @@ { return saved; } /** * Build a copy of the ServerState with only ChangeNumbers older than * a specific ChangeNumber. This is used when building the initial * Cookie in the External Changelog, to cope with purged changes. * @param cn The ChangeNumber to compare the ServerState with * @return a copy of the ServerState which only contains the ChangeNumbers * older than cn. */ public ServerState duplicateOnlyOlderThan(ChangeNumber cn) { ServerState newState = new ServerState(); synchronized (list) { for (Integer key : list.keySet()) { ChangeNumber change = list.get(key); Integer id = change.getServerId(); if (change.older(cn)) { newState.list.put(id,change); } } } return newState; } } opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -466,6 +466,13 @@ ChangeNumber trimDate = new ChangeNumber(latestTrimDate, 0, 0); // Find the last changeNumber before the trimDate, in the Database. ChangeNumber lastBeforeTrimDate = db.getPreviousChangeNumber(trimDate); if (lastBeforeTrimDate != null) { // If we found it, we want to stop trimming when reaching it. trimDate = lastBeforeTrimDate; } // In case of deadlock detection by the Database, this thread can // by aborted by a DeadlockException. This is a transient error and // the transaction should be attempted again. opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -668,6 +668,34 @@ } /** * Getter for the integer value of the current curson, representing * the current DraftChangeNumber being processed. * * @return the current DraftCN as an integer. */ public int currentKey() { try { OperationStatus status = cursor.getCurrent(key, entry, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { return -1; } String str = decodeUTF8(key.getData()); int draftCN = new Integer(str); return draftCN; } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } return -1; } /** * Returns the replication changeNumber associated with the current key. * @return the replication changeNumber */ opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -389,18 +389,22 @@ // We don't use the returned endState but it's updating CN as // reading domain.getEligibleState(crossDomainEligibleCN, false); domain.getEligibleState(crossDomainEligibleCN); ChangeNumber fcn = startState.getMaxChangeNumber( cn.getServerId()); if (cn.older(fcn)) int currentKey = cursor.currentKey(); // Do not delete the lastKey. This should allow us to // preserve last change number over time. if ((currentKey != lastkey) && (cn.older(fcn))) { size++; cursor.delete(); } else { firstkey = currentKey; finished = true; } } opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -182,8 +182,8 @@ + ")" + "] [nextNonEligibleMsg=" + nextNonEligibleMsg + "] [startState=" + startState + "] [stopState= " + stopState + "] [currentState= " + currentState + "]]"); "] [stopState=" + stopState + "] [currentState=" + currentState + "]]"); } /** @@ -749,7 +749,7 @@ if (isPersistent == StartECLSessionMsg.PERSISTENT_CHANGES_ONLY) { newDomainCtxt.startState = rsd.getEligibleState(eligibleCN, true); newDomainCtxt.startState = rsd.getEligibleState(eligibleCN); startStatesFromProvidedCookie.remove(rsd.getBaseDn()); } else @@ -766,7 +766,12 @@ // let's start traversing this domain from the beginning of // what we have in the replication changelog if (newDomainCtxt.startState == null) newDomainCtxt.startState = new ServerState(); { ChangeNumber latestTrimCN = new ChangeNumber(newDomainCtxt.domainLatestTrimDate, 0,0); newDomainCtxt.startState = rsd.getStartState() .duplicateOnlyOlderThan(latestTrimCN); } } else { @@ -807,7 +812,7 @@ } // Set the stop state for the domain from the eligibleCN newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN, true); newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN); } newDomainCtxt.currentState = new ServerState(); @@ -822,9 +827,8 @@ rsd.registerHandler(mh); newDomainCtxt.mh = mh; previousCookie.update( newDomainCtxt.rsd.getBaseDn(), newDomainCtxt.startState); previousCookie.update(newDomainCtxt.rsd.getBaseDn(), newDomainCtxt.startState); // store the new context tmpSet.add(newDomainCtxt); @@ -1052,6 +1056,7 @@ ResultCode.UNWILLING_TO_PERFORM, ERR_INVALID_COOKIE_SYNTAX.get()); } excludedServiceIDs = startECLSessionMsg.getExcludedServiceIDs(); replicationServer.disableEligibility(excludedServiceIDs); eligibleCN = replicationServer.getEligibleCN(); opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -450,6 +450,12 @@ */ cn = null; } else { str = decodeUTF8(key.getData()); cn= new ChangeNumber(str); // There can't be 2 counter record next to each other } } } catch (DatabaseException e) @@ -470,6 +476,110 @@ } /** * Try to find in the DB, the change number right before the one * passed as a parameter. * * @param changeNumber * The changeNumber from which we start searching. * @return the changeNumber right before the one passed as a parameter. * Can return null if there is none. */ public ChangeNumber getPreviousChangeNumber(ChangeNumber changeNumber) { if (changeNumber == null) return null; Cursor cursor = null; ChangeNumber cn = null; DatabaseEntry key = new ReplicationKey(changeNumber); DatabaseEntry data = new DatabaseEntry(); Transaction txn = null; dbCloseLock.readLock().lock(); try { cursor = db.openCursor(txn, null); if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS) { // We can move close to the changeNumber. // Let's move to the previous change. if (cursor.getPrev(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS) { String str = decodeUTF8(key.getData()); cn = new ChangeNumber(str); if (ReplicationDB.isaCounter(cn)) { if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { // database starts with a counter record. cn = null; } else { str = decodeUTF8(key.getData()); cn= new ChangeNumber(str); // There can't be 2 counter record next to each other } } } // else, there was no change previous to our changeNumber. } else { // We could not move the cursor past to the changeNumber // Check if the last change is older than changeNumber if (cursor.getLast(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS) { String str = decodeUTF8(key.getData()); cn = new ChangeNumber(str); if (ReplicationDB.isaCounter(cn)) { if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { /* * database only contain a counter record, should not be * possible, but Ok, let's just say no change Number */ cn = null; } else { str = decodeUTF8(key.getData()); cn= new ChangeNumber(str); // There can't be 2 counter record next to each other } } } } } catch (DatabaseException e) { /* database is faulty */ MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); // TODO: Verify if shutting down replication is the right thing to do replicationServer.shutdown(); cn = null; } finally { closeLockedCursor(cursor); } return cn; } /** * {@inheritDoc} */ @Override opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1977,7 +1977,7 @@ continue; result.update(rsd.getBaseDn(), rsd.getEligibleState( getEligibleCN(),false)); getEligibleCN())); } } return result; opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -3153,12 +3153,9 @@ * The eligibleState is : s1;cn14 / s2;cn26 / s3;cn31 * * @param eligibleCN The provided eligibleCN. * @param allowOlderThanPurgeDate When true, the returned state can be older * than the purge date of the domain. * @return The computed eligible server state. */ public ServerState getEligibleState(ChangeNumber eligibleCN, boolean allowOlderThanPurgeDate) public ServerState getEligibleState(ChangeNumber eligibleCN) { ServerState result = new ServerState(); @@ -3225,24 +3222,6 @@ } } if (allowOlderThanPurgeDate == false) { boolean domainPurged = true; long latestDomainTrimDate = getLatestDomainTrimDate(); Iterator<Integer> it = result.iterator(); while (it.hasNext()) { int sid = it.next(); ChangeNumber cn = result.getMaxChangeNumber(sid); if ((cn.getTime()>0) && (cn.getTime()<latestDomainTrimDate)) result.update(new ChangeNumber(0,0,sid)); else domainPurged = false; } if (domainPurged == true) result.clear(); } if (debugEnabled()) TRACER.debugInfo("In " + this + " getEligibleState() result is " + result);