opends/src/admin/defn/org/opends/server/admin/std/ExternalChangelogDomainConfiguration.xml
@@ -24,7 +24,7 @@ ! ! ! Copyright 2009 Sun Microsystems, Inc. ! Portions copyright 2011 ForgeRock AS ! Portions copyright 2011-2013 ForgeRock AS ! --> <adm:managed-object name="external-changelog-domain" plural-name="external-changelog-domains" @@ -45,9 +45,9 @@ </adm:profile> <adm:property name="enabled" mandatory="true"> <adm:synopsis> Indicates whether the <adm:user-friendly-name /> is enabled. Indicates whether the <adm:user-friendly-name /> is enabled. To enable computing the change numbers, set the Replication Server's "ds-cfg-compute-changenumber" property to true. </adm:synopsis> <adm:syntax> <adm:boolean /> opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
@@ -328,4 +328,31 @@ </ldap:attribute> </adm:profile> </adm:property> <adm:property name="compute-changenumber" mandatory="false"> <adm:synopsis> Whether the replication server will compute changenumbers. </adm:synopsis> <adm:description> This boolean tells the replication server to compute changenumbers for each replicated change by maintaining a changenumber index database. Changenumbers are computed according to http://tools.ietf.org/html/draft-good-ldap-changelog-04. Note this functionality has an impact on CPU, disk accesses and storage. If changenumbers are not required, it is advisable to set this value to false. </adm:description> <adm:default-behavior> <adm:defined> <adm:value>true</adm:value> </adm:defined> </adm:default-behavior> <adm:syntax> <adm:boolean /> </adm:syntax> <adm:profile name="ldap"> <ldap:attribute> <ldap:name>ds-cfg-compute-changenumber</ldap:name> </ldap:attribute> </adm:profile> </adm:property> </adm:managed-object> opends/src/admin/messages/ExternalChangelogDomainCfgDefn.properties
@@ -5,4 +5,4 @@ property.ecl-include.description=The list of attributes may include wild cards such as "*" and "+" as well as object class references prefixed with an ampersand, for example "@person". The included attributes will be published using the "includedAttributes" operational attribute as a single LDIF value rather like the "changes" attribute. For modify and modifyDN operations the included attributes will be taken from the entry before any changes were applied. property.ecl-include-for-deletes.synopsis=Specifies a list of attributes which should be published with every delete operation change log entry, in addition to those specified by the "ecl-include" property. property.ecl-include-for-deletes.description=This property provides a means for applications to archive entries after they have been deleted. See the description of the "ecl-include" property for further information about how the included attributes are published. property.enabled.synopsis=Indicates whether the External Changelog Domain is enabled. property.enabled.synopsis=Indicates whether the External Changelog Domain is enabled. To enable computing the change numbers, set the Replication Server's "ds-cfg-compute-changenumber" property to true. opends/src/admin/messages/ReplicationServerCfgDefn.properties
@@ -3,6 +3,8 @@ synopsis=Replication Servers publish updates to Directory Servers within a Replication Domain. property.assured-timeout.synopsis=The timeout value when waiting for assured mode acknowledgments. property.assured-timeout.description=Defines the number of milliseconds that the replication server will wait for assured acknowledgments (in either Safe Data or Safe Read assured sub modes) before forgetting them and answer to the entity that sent an update and is waiting for acknowledgment. property.compute-changenumber.synopsis=Whether the replication server will compute changenumbers. property.compute-changenumber.description=This boolean tells the replication server to compute changenumbers for each replicated change by maintaining a changenumber index database. Changenumbers are computed according to http://tools.ietf.org/html/draft-good-ldap-changelog-04. Note this functionality has an impact on CPU, disk accesses and storage. If changenumbers are not required, it is advisable to set this value to false. property.degraded-status-threshold.synopsis=The number of pending changes as threshold value for putting a directory server in degraded status. property.degraded-status-threshold.description=This value represents a number of pending changes a replication server has in queue for sending to a directory server. Once this value is crossed, the matching directory server goes in degraded status. When number of pending changes goes back under this value, the directory server is put back in normal status. 0 means status analyzer is disabled and directory servers are never put in degraded status. property.group-id.synopsis=The group id for the replication server. opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -31,6 +31,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import org.opends.messages.Category; import org.opends.messages.Message; @@ -54,14 +56,14 @@ /** * The list of (domain service id, ServerState). */ private Map<DN, ServerState> list; private final ConcurrentMap<DN, ServerState> list; /** * Creates a new empty object. */ public MultiDomainServerState() { list = new TreeMap<DN, ServerState>(); list = new ConcurrentSkipListMap<DN, ServerState>(); } /** @@ -71,10 +73,10 @@ */ public MultiDomainServerState(String mdss) throws DirectoryException { list = splitGenStateToServerStates(mdss); list = new ConcurrentSkipListMap<DN, ServerState>( splitGenStateToServerStates(mdss)); } /** * Empty the object.. * After this call the object will be in the same state as if it @@ -82,10 +84,7 @@ */ public void clear() { synchronized (this) { list.clear(); } list.clear(); } /** @@ -102,22 +101,22 @@ if (csn == null) return false; synchronized(this) ServerState serverState = list.get(baseDN); if (serverState == null) { ServerState oldServerState = list.get(baseDN); if (oldServerState == null) serverState = new ServerState(); final ServerState existingSS = list.putIfAbsent(baseDN, serverState); if (existingSS != null) { oldServerState = new ServerState(); list.put(baseDN, oldServerState); serverState = existingSS; } return oldServerState.update(csn); } return serverState.update(csn); } /** * Update the ServerState of the provided baseDN with the provided server * state. The provided server state will be owned by this instance, so care * must be taken by calling code to duplicate it if needed. * state. * * @param baseDN * The provided baseDN. @@ -126,6 +125,28 @@ */ public void update(DN baseDN, ServerState serverState) { for (CSN csn : serverState) { update(baseDN, csn); } } /** * Replace the ServerState of the provided baseDN with the provided server * state. The provided server state will be owned by this instance, so care * must be taken by calling code to duplicate it if needed. * * @param baseDN * The provided baseDN. * @param serverState * The provided serverState. */ public void replace(DN baseDN, ServerState serverState) { if (serverState == null) { throw new IllegalArgumentException("ServerState must not be null"); } list.put(baseDN, serverState); } opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -841,8 +841,8 @@ domain.registerHandler(mh); newDomainCtxt.mh = mh; previousCookie.update(newDomainCtxt.rsDomain.getBaseDN(), newDomainCtxt.startState.duplicate()); previousCookie.replace(newDomainCtxt.rsDomain.getBaseDN(), newDomainCtxt.startState.duplicate()); results.add(newDomainCtxt); } @@ -1260,13 +1260,9 @@ final DomainContext oldestContext = findDomainCtxtWithOldestChange(); if (oldestContext != null) { final ECLUpdateMsg change = newECLUpdateMsg(oldestContext); oldestContext.currentState.update(change.getUpdateMsg().getCSN()); if (draftCompat) { assignNewChangeNumberAndStore(change); } oldestChange = change; oldestChange = newECLUpdateMsg(oldestContext); oldestContext.currentState.update( oldestChange.getUpdateMsg().getCSN()); } } } @@ -1326,7 +1322,7 @@ * if a database problem occurs. */ private boolean assignChangeNumber(final ECLUpdateMsg replicaDBChange) throws ChangelogException throws ChangelogException, DirectoryException { // We also need to check if the CNIndexDB is consistent with the replicaDBs. // If not, 2 potential reasons: @@ -1337,15 +1333,8 @@ CSN csnFromReplicaDB = replicaDBChange.getUpdateMsg().getCSN(); DN baseDNFromReplicaDB = replicaDBChange.getBaseDN(); while (true) while (!isEndOfCNIndexDBReached) { if (isEndOfCNIndexDBReached) { // we are at the end of the CNIndexDB in the append mode assignNewChangeNumberAndStore(replicaDBChange); return true; } final ChangeNumberIndexRecord currentRecord = cnIndexDBCursor.getRecord(); final CSN csnFromCNIndexDB = currentRecord.getCSN(); final DN baseDNFromCNIndexDB = currentRecord.getBaseDN(); @@ -1366,6 +1355,9 @@ + currentRecord.getChangeNumber() + " to change=" + replicaDBChange); previousCookie = new MultiDomainServerState(currentRecord.getPreviousCookie()); replicaDBChange.setCookie(previousCookie); replicaDBChange.setChangeNumber(currentRecord.getChangeNumber()); return true; } @@ -1411,6 +1403,7 @@ // continuously throws ChangelogExceptions } } return false; } private Date asDate(CSN csn) @@ -1425,18 +1418,6 @@ return sameDN && sameCSN; } private void assignNewChangeNumberAndStore(ECLUpdateMsg change) throws ChangelogException { final ChangeNumberIndexRecord record = new ChangeNumberIndexRecord(previousCookie.toString(), change.getBaseDN(), change.getUpdateMsg().getCSN()); // store in CNIndexDB the pair // (change number of the current change, state before this change) change.setChangeNumber( replicationServer.getChangeNumberIndexDB().addRecord(record)); } /** * Terminates the first (non persistent) phase of the search on the ECL. */ opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -144,9 +144,7 @@ throws ConfigException { this.config = configuration; this.changelogDB = new JEChangelogDB(this, configuration.getReplicationDBDirectory()); this.changelogDB = new JEChangelogDB(this, configuration); replSessionSecurity = new ReplSessionSecurity(); initialize(); @@ -764,6 +762,9 @@ public ConfigChangeResult applyConfigurationChange( ReplicationServerCfg configuration) { ResultCode resultCode = ResultCode.SUCCESS; boolean adminActionRequired = false; // Some of those properties change don't need specific code. // They will be applied for next connections. Some others have immediate // effect @@ -779,6 +780,20 @@ { this.changelogDB.setPurgeDelay(getTrimAge()); } final boolean computeCN = config.isComputeChangenumber(); if (computeCN != oldConfig.isComputeChangenumber()) { try { this.changelogDB.setComputeChangeNumber(computeCN); } catch (ChangelogException e) { if (debugEnabled()) TRACER.debugCaught(DebugLogLevel.ERROR, e); resultCode = ResultCode.OPERATIONS_ERROR; } } // changing the listen port requires to stop the listen thread // and restart it. @@ -800,10 +815,14 @@ } catch (IOException e) { if (debugEnabled()) TRACER.debugCaught(DebugLogLevel.ERROR, e); logError(ERR_COULD_NOT_CLOSE_THE_SOCKET.get(e.toString())); } catch (InterruptedException e) { if (debugEnabled()) TRACER.debugCaught(DebugLogLevel.ERROR, e); logError(ERR_COULD_NOT_STOP_LISTEN_THREAD.get(e.toString())); } } @@ -849,10 +868,9 @@ final String newDir = config.getReplicationDBDirectory(); if (newDir != null && !newDir.equals(oldConfig.getReplicationDBDirectory())) { return new ConfigChangeResult(ResultCode.SUCCESS, true); adminActionRequired = true; } return new ConfigChangeResult(ResultCode.SUCCESS, false); return new ConfigChangeResult(resultCode, adminActionRequired); } /** @@ -1505,7 +1523,7 @@ public MultiDomainServerState getNewestECLCookie(Set<String> excludedBaseDNs) { // Initialize start state for all running domains with empty state MultiDomainServerState result = new MultiDomainServerState(); final MultiDomainServerState result = new MultiDomainServerState(); for (ReplicationServerDomain rsDomain : getReplicationServerDomains()) { if (contains(excludedBaseDNs, rsDomain.getBaseDN().toNormalizedString())) @@ -1513,7 +1531,7 @@ final ServerState latestDBServerState = rsDomain.getLatestServerState(); if (latestDBServerState.isEmpty()) continue; result.update(rsDomain.getBaseDN(), latestDBServerState); result.replace(rsDomain.getBaseDN(), latestDBServerState); } return result; } opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -57,6 +57,19 @@ void setPurgeDelay(long delayInMillis); /** * Sets whether the replication database must compute change numbers for * replicated changes. Change numbers are computed using a separate new * thread. * * @param computeChangeNumber * whether to compute change numbers for replicated changes * @throws ChangelogException * If a database problem happened */ void setComputeChangeNumber(boolean computeChangeNumber) throws ChangelogException; /** * Shutdown the replication database. * * @throws ChangelogException opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -30,6 +30,7 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import org.opends.messages.Message; import org.opends.server.api.DirectoryThread; @@ -43,6 +44,9 @@ import org.opends.server.types.DN; import org.opends.server.types.DebugLogLevel; import org.opends.server.types.DirectoryException; import org.opends.server.util.StaticUtils; import com.forgerock.opendj.util.Pair; import static org.opends.server.loggers.debug.DebugLogger.*; @@ -57,6 +61,11 @@ /** The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); /** * If this is true, then the {@link #run()} method must clear its state. * Otherwise the run method executes normally. */ private final AtomicBoolean doClear = new AtomicBoolean(); private final ChangelogDB changelogDB; /** Only used for initialization, and then discarded. */ private ChangelogState changelogState; @@ -101,12 +110,12 @@ new MultiDomainServerState(); /** * Composite cursor across all the replicaDBs for all the replication domains. * It is volatile to ensure it supports concurrent update. Each time it is * used more than once in a method, the method must take a local copy to * ensure the cursor does not get updated in the middle of the method. * Cursor across all the replicaDBs for all the replication domains. It is * positioned on the next change that needs to be inserted in the CNIndexDB. * <p> * Note: it is only accessed from the {@link #run()} method. */ private volatile CompositeDBCursor<DN> crossDomainDBCursor; private CompositeDBCursor<DN> nextChangeForInsertDBCursor; /** * New cursors for this Map must be created from the {@link #run()} method, @@ -116,9 +125,27 @@ */ private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors = new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>(); /** This map can be updated by multiple threads. */ private ConcurrentMap<CSN, DN> newCursors = new ConcurrentSkipListMap<CSN, DN>(); /** * Holds the newCursors that will have to be created in the next iteration * inside the {@link #run()} method. * <p> * This map can be updated by multiple threads. */ private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors = new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>( new Comparator<Pair<DN, Integer>>() { @Override public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2) { final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst()); if (compareBaseDN == 0) { return o1.getSecond().compareTo(o2.getSecond()); } return compareBaseDN; } }); /** * Builds a ChangeNumberIndexer object. @@ -164,7 +191,8 @@ { final CSN csn = updateMsg.getCSN(); lastSeenUpdates.update(baseDN, csn); newCursors.put(csn, baseDN); // only keep the oldest CSN that will be the new cursor's starting point newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn); tryNotify(baseDN); } @@ -210,17 +238,23 @@ return true; } /** * Restores in memory data needed to build the CNIndexDB, including the medium * consistency point. */ private void initialize() throws ChangelogException, DirectoryException { final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord(); if (newestRecord != null) { // restore the mediumConsistencyRUV from DB mediumConsistencyRUV.update( new MultiDomainServerState(newestRecord.getPreviousCookie())); } // initialize the cross domain DB cursor // initialize the DB cursor and the last seen updates // to ensure the medium consistency CSN can move forward final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); for (Entry<DN, List<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) @@ -235,12 +269,12 @@ ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); lastSeenUpdates.update(baseDN, latestKnownState); } resetNextChangeForInsertDBCursor(); crossDomainDBCursor = newCompositeDBCursor(); if (newestRecord != null) { // restore the "previousCookie" state before shutdown final UpdateMsg record = crossDomainDBCursor.getRecord(); final UpdateMsg record = nextChangeForInsertDBCursor.getRecord(); if (!record.getCSN().equals(newestRecord.getCSN())) { // TODO JNR i18n safety check, should never happen @@ -248,14 +282,14 @@ + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN())); } mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN()); crossDomainDBCursor.next(); nextChangeForInsertDBCursor.next(); } // this will not be used any more. Discard for garbage collection. this.changelogState = null; } private CompositeDBCursor<DN> newCompositeDBCursor() throws ChangelogException private void resetNextChangeForInsertDBCursor() throws ChangelogException { final Map<DBCursor<UpdateMsg>, DN> cursors = new HashMap<DBCursor<UpdateMsg>, DN>(); @@ -270,7 +304,7 @@ } final CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors); result.next(); return result; nextChangeForInsertDBCursor = result; } private boolean ensureCursorExists(DN baseDN, Integer serverId, CSN csn) @@ -286,13 +320,27 @@ if (cursor == null) { final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); cursor = domainDB.getCursorFrom(baseDN, serverId, csn); // use an older CSN because getCursorFrom() starts after the given CSN final CSN anOlderCSN = getPrecedingCSN(csn); cursor = domainDB.getCursorFrom(baseDN, serverId, anOlderCSN); map.put(serverId, cursor); return false; } return true; } /** * Returns the immediately preceding CSN. */ private CSN getPrecedingCSN(CSN csn) { if (csn.getSeqnum() > 0) { return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId()); } return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId()); } /** {@inheritDoc} */ @Override public void run() @@ -305,83 +353,96 @@ * used. */ initialize(); } catch (DirectoryException e) { // TODO JNR error message i18n if (debugEnabled()) TRACER.debugCaught(DebugLogLevel.ERROR, e); return; while (!isShutdownInitiated()) { try { if (doClear.get()) { removeAllCursors(); // No need to use CAS here because it is only for unit tests and at // this point all will have been cleaned up anyway. doClear.set(false); } else { createNewCursors(); } final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord(); if (msg == null) { synchronized (this) { wait(); } // advance cursor, success/failure will be checked later nextChangeForInsertDBCursor.next(); // loop to check whether new changes have been added to the // ReplicaDBs continue; } final CSN csn = msg.getCSN(); final DN baseDN = nextChangeForInsertDBCursor.getData(); // FIXME problem: what if the serverId is not part of the ServerState? // right now, thread will be blocked if (!canMoveForwardMediumConsistencyPoint(baseDN)) { // the oldest record to insert is newer than the medium consistency // point. Let's wait for a change that can be published. synchronized (this) { // double check to protect against a missed call to notify() if (!canMoveForwardMediumConsistencyPoint(baseDN)) { wait(); // loop to check if changes older than the medium consistency // point have been added to the ReplicaDBs continue; } } } // OK, the oldest change is older than the medium consistency point // let's publish it to the CNIndexDB. // Next if statement is ugly but ensures the first change will not be // immediately trimmed from the CNIndexDB. Yuck! if (mediumConsistencyRUV.isEmpty()) { mediumConsistencyRUV.replace(baseDN, new ServerState()); } final String previousCookie = mediumConsistencyRUV.toString(); final ChangeNumberIndexRecord record = new ChangeNumberIndexRecord(previousCookie, baseDN, csn); changelogDB.getChangeNumberIndexDB().addRecord(record); moveForwardMediumConsistencyPoint(csn, baseDN); // advance cursor, success/failure will be checked later nextChangeForInsertDBCursor.next(); } catch (InterruptedException ignored) { // was shutdown called? loop to figure it out. Thread.currentThread().interrupt(); } } removeAllCursors(); } catch (ChangelogException e) { // TODO JNR error message i18n if (debugEnabled()) TRACER.debugCaught(DebugLogLevel.ERROR, e); return; // TODO JNR error message i18n } while (!isShutdownInitiated()) catch (DirectoryException e) { try { createNewCursors(); final UpdateMsg msg = crossDomainDBCursor.getRecord(); if (msg == null) { synchronized (this) { wait(); } // advance cursor, success/failure will be checked later crossDomainDBCursor.next(); // loop to check whether new changes have been added to the ReplicaDBs continue; } final CSN csn = msg.getCSN(); final DN baseDN = crossDomainDBCursor.getData(); // FIXME problem: what if the serverId is not part of the ServerState? // right now, thread will be blocked if (!canMoveForwardMediumConsistencyPoint(baseDN)) { // the oldest record to insert is newer than the medium consistency // point. Let's wait for a change that can be published. synchronized (this) { // double check to protect against a missed call to notify() if (!canMoveForwardMediumConsistencyPoint(baseDN)) { wait(); // loop to check if changes older than the medium consistency // point have been added to the ReplicaDBs continue; } } } // OK, the oldest change is older than the medium consistency point // let's publish it to the CNIndexDB final String previousCookie = mediumConsistencyRUV.toString(); final ChangeNumberIndexRecord record = new ChangeNumberIndexRecord(previousCookie, baseDN, csn); changelogDB.getChangeNumberIndexDB().addRecord(record); moveForwardMediumConsistencyPoint(csn, baseDN); // advance cursor, success/failure will be checked later crossDomainDBCursor.next(); } catch (ChangelogException e) { if (debugEnabled()) TRACER.debugCaught(DebugLogLevel.ERROR, e); // TODO JNR error message i18n } catch (InterruptedException ignored) { // was shutdown called? } if (debugEnabled()) TRACER.debugCaught(DebugLogLevel.ERROR, e); // TODO JNR error message i18n } } @@ -402,20 +463,32 @@ } } private void removeAllCursors() throws ChangelogException { for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values()) { StaticUtils.close(map.values()); } allCursors.clear(); newCursors.clear(); resetNextChangeForInsertDBCursor(); } private void removeCursor(final DN baseDN, final CSN csn) { for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry : allCursors .entrySet()) for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1 : allCursors.entrySet()) { if (baseDN.equals(entry.getKey())) if (baseDN.equals(entry1.getKey())) { final Set<Integer> serverIds = entry.getValue().keySet(); for (Iterator<Integer> iter = serverIds.iterator(); iter.hasNext();) for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = entry1.getValue().entrySet().iterator(); iter.hasNext();) { final int serverId = iter.next(); if (csn.getServerId() == serverId) final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next(); if (csn.getServerId() == entry2.getKey()) { iter.remove(); StaticUtils.close(entry2.getValue()); return; } } @@ -428,12 +501,13 @@ if (!newCursors.isEmpty()) { boolean newCursorAdded = false; for (Iterator<Entry<CSN, DN>> iter = newCursors.entrySet().iterator(); iter.hasNext();) for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter = newCursors.entrySet().iterator(); iter.hasNext();) { final Entry<CSN, DN> entry = iter.next(); final CSN csn = entry.getKey(); if (!ensureCursorExists(entry.getValue(), csn.getServerId(), null)) final Entry<Pair<DN, Integer>, CSN> entry = iter.next(); final DN baseDN = entry.getKey().getFirst(); final CSN csn = entry.getValue(); if (!ensureCursorExists(baseDN, csn.getServerId(), csn)) { newCursorAdded = true; } @@ -441,9 +515,29 @@ } if (newCursorAdded) { crossDomainDBCursor = newCompositeDBCursor(); resetNextChangeForInsertDBCursor(); } } } /** * Asks the current thread to clear its state. * <p> * This method is only useful for unit tests. */ public void clear() { doClear.set(true); synchronized (this) { notify(); } while (doClear.get()) { // wait until clear() has been done by thread // ensures unit tests wait that this thread's state is cleaned up Thread.yield(); } } } opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -295,6 +295,30 @@ replicationServer.shutdown(); break; } try { trim(shutdown); synchronized (this) { try { wait(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } catch (Exception end) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get()); mb.append(" "); mb.append(stackTraceToSingleLineString(end)); logError(mb.toMessage()); if (replicationServer != null) replicationServer.shutdown(); break; } } synchronized (this) @@ -306,9 +330,13 @@ /** * Trim old changes from this database. * @throws ChangelogException In case of database problem. * * @param shutdown * AtomicBoolean telling whether the current run must be stopped * @throws ChangelogException * In case of database problem. */ private void trim(AtomicBoolean shutdown) throws ChangelogException public void trim(AtomicBoolean shutdown) throws ChangelogException { if (trimAge == 0) return; opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -31,9 +31,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.opends.messages.Message; import org.opends.messages.MessageBuilder; import org.opends.server.admin.std.server.ReplicationServerCfg; import org.opends.server.config.ConfigException; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.CSN; @@ -79,7 +81,7 @@ domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>(); private ReplicationDbEnv dbEnv; private final String dbDirectoryName; private ReplicationServerCfg config; private final File dbDirectory; /** @@ -89,6 +91,8 @@ * Guarded by cnIndexDBLock */ private JEChangeNumberIndexDB cnIndexDB; private final AtomicReference<ChangeNumberIndexer> cnIndexer = new AtomicReference<ChangeNumberIndexer>(); /** Used for protecting {@link ChangeNumberIndexDB} related state. */ private final Object cnIndexDBLock = new Object(); @@ -131,17 +135,17 @@ * * @param replicationServer * the local replication server. * @param dbDirName * the directory for use by the replication database * @param config * the replication server configuration * @throws ConfigException * if a problem occurs opening the supplied directory */ public JEChangelogDB(ReplicationServer replicationServer, String dbDirName) throws ConfigException public JEChangelogDB(ReplicationServer replicationServer, ReplicationServerCfg config) throws ConfigException { this.config = config; this.replicationServer = replicationServer; this.dbDirectoryName = dbDirName != null ? dbDirName : "changelogDb"; this.dbDirectory = makeDir(this.dbDirectoryName); this.dbDirectory = makeDir(config.getReplicationDBDirectory()); } private File makeDir(String dbDirName) throws ConfigException @@ -303,9 +307,19 @@ { try { dbEnv = new ReplicationDbEnv( getFileForPath(dbDirectoryName).getAbsolutePath(), replicationServer); initializeChangelogState(dbEnv.readChangelogState()); final File dbDir = getFileForPath(config.getReplicationDBDirectory()); dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer); final ChangelogState changelogState = dbEnv.readChangelogState(); initializeChangelogState(changelogState); if (config.isComputeChangenumber()) { final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState); if (cnIndexer.compareAndSet(null, indexer)) { indexer.start(); } } } catch (ChangelogException e) { @@ -361,6 +375,12 @@ // - then throw the first encountered exception ChangelogException firstException = null; final ChangeNumberIndexer indexer = cnIndexer.get(); if (indexer != null) { indexer.initiateShutdown(); cnIndexer.compareAndSet(indexer, null); } try { shutdownCNIndexDB(); @@ -411,6 +431,12 @@ // - then throw the first encountered exception ChangelogException firstException = null; final ChangeNumberIndexer indexer = cnIndexer.get(); if (indexer != null) { indexer.clear(); } for (DN baseDN : this.domainToReplicaDBs.keySet()) { removeDomain(baseDN); @@ -617,6 +643,11 @@ @Override public void setPurgeDelay(long delay) { final JEChangeNumberIndexDB cnIndexDB = this.cnIndexDB; if (cnIndexDB != null) { cnIndexDB.setPurgeDelay(delay); } for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values()) { for (JEReplicaDB replicaDB : domainMap.values()) @@ -628,6 +659,31 @@ /** {@inheritDoc} */ @Override public void setComputeChangeNumber(boolean computeChangeNumber) throws ChangelogException { final ChangeNumberIndexer indexer; if (computeChangeNumber) { final ChangelogState changelogState = dbEnv.readChangelogState(); indexer = new ChangeNumberIndexer(this, changelogState); if (cnIndexer.compareAndSet(null, indexer)) { indexer.start(); } } else { indexer = cnIndexer.getAndSet(null); if (indexer != null) { indexer.initiateShutdown(); } } } /** {@inheritDoc} */ @Override public long getDomainLatestTrimDate(DN baseDN) { long latest = 0; @@ -693,7 +749,8 @@ for (int serverId : serverIds) { // get the last already sent CSN from that server to get a cursor final CSN lastCSN = startAfterServerState.getCSN(serverId); final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null); } return new CompositeDBCursor<Void>(cursors); @@ -751,6 +808,11 @@ final boolean wasCreated = pair.getSecond(); replicaDB.add(updateMsg); final ChangeNumberIndexer indexer = cnIndexer.get(); if (indexer != null) { indexer.publishUpdateMsg(baseDN, updateMsg); } return wasCreated; } opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -48,6 +48,7 @@ import static com.sleepycat.je.LockMode.*; import static com.sleepycat.je.OperationStatus.*; import static org.opends.messages.JebMessages.*; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; @@ -246,11 +247,12 @@ } catch (RuntimeException e) { throw new ChangelogException(e); final Message message = ERR_JEB_DATABASE_EXCEPTION.get(e.getMessage()); throw new ChangelogException(message, e); } catch (DirectoryException e) { throw new ChangelogException(e); throw new ChangelogException(e.getMessageObject(), e); } finally { @@ -439,7 +441,7 @@ } catch (DatabaseException e) { logError(newErrorMessage(null, e)); logError(closeDBErrorMessage(null, e)); } } @@ -452,11 +454,11 @@ } catch (DatabaseException e) { logError(newErrorMessage(db.getDatabaseName(), e)); logError(closeDBErrorMessage(db.getDatabaseName(), e)); } } private Message newErrorMessage(String dbName, DatabaseException e) private Message closeDBErrorMessage(String dbName, DatabaseException e) { if (dbName != null) { opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -151,6 +151,7 @@ new ReplServerFakeConfiguration( replicationServerPort, "ExternalChangeLogTestDb", 0, 71, 0, maxWindow, null); conf1.setComputeChangenumber(true); replicationServer = new ReplicationServer(conf1); debugInfo("configure", "ReplicationServer created"+replicationServer); @@ -169,6 +170,7 @@ @Test(enabled=true, dependsOnMethods = { "ECLReplicationServerPreTest"}) public void ECLReplicationServerTest() throws Exception { getCNIndexDB().setPurgeDelay(0); // Following test does not create RSDomain (only broker) but want to test // ECL .. so let's enable ECl manually // Now that we tested that ECl is not available @@ -191,6 +193,7 @@ @Test(enabled=true, dependsOnMethods = { "ECLReplicationServerTest"}) public void ECLReplicationServerTest1() throws Exception { getCNIndexDB().setPurgeDelay(0); // Test with a mix of domains, a mix of DSes ECLTwoDomains(); } @@ -205,6 +208,7 @@ @Test(enabled=true, dependsOnMethods = { "ECLReplicationServerTest"}) public void ECLReplicationServerTest3() throws Exception { getCNIndexDB().setPurgeDelay(0); // Write changes and read ECL from start ECLCompatWriteReadAllOps(1); @@ -263,6 +267,7 @@ @Test(enabled=true, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"}) public void ECLReplicationServerFullTest3() throws Exception { getCNIndexDB().setPurgeDelay(0); // Test all types of ops. ECLAllOps(); // Do not clean the db for the next test @@ -353,6 +358,8 @@ @Test(enabled=true, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"}) public void ECLReplicationServerFullTest15() throws Exception { final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(); cnIndexDB.setPurgeDelay(0); // Write 4 changes and read ECL from start ECLCompatWriteReadAllOps(1); @@ -373,6 +380,8 @@ ECLCompatTestLimitsAndAdd(1, 8, 4); // Test CNIndexDB is purged when replication change log is purged cnIndexDB.setPurgeDelay(1); cnIndexDB.trim(null); ECLPurgeCNIndexDBAfterChangelogClear(); // Test first and last are updated @@ -1949,6 +1958,16 @@ clearChangelogDB(replicationServer); } @AfterTest public void setPurgeDelayToInitialValue() throws Exception { JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(); if (cnIndexDB != null) { cnIndexDB.setPurgeDelay(1); } } /** * After the tests stop the replicationServer. */ @@ -2593,6 +2612,15 @@ debugInfo(tn, "Ending test with success"); } private JEChangeNumberIndexDB getCNIndexDB() { if (replicationServer != null) { return (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB(); } return null; } private void ECLGetEligibleCountTest() throws Exception { String tn = "ECLGetEligibleCountTest"; @@ -2604,6 +2632,7 @@ final CSN csn2 = csns[1]; final CSN csn3 = csns[2]; getCNIndexDB().setPurgeDelay(0); ReplicationServerDomain rsdtest = replicationServer.getReplicationServerDomain(TEST_ROOT_DN); // this empty state will force to count from the start of the DB final ServerState fromStart = new ServerState(); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
@@ -40,6 +40,7 @@ * This Class implements an object that can be used to instantiate * The ReplicationServer class for tests purpose. */ @SuppressWarnings("javadoc") public class ReplServerFakeConfiguration implements ReplicationServerCfg { private int port; @@ -67,6 +68,7 @@ /** The monitoring publisher period. */ private long monitoringPeriod = 3000; private boolean computeChangenumber; /** * Constructor without group id, assured info and weight @@ -140,15 +142,17 @@ /** * {@inheritDoc} */ @Override public void addChangeListener( ConfigurationChangeListener<ReplicationServerCfg> listener) { // not supported } /** * {@inheritDoc} */ @Override public Class<? extends ReplicationServerCfg> configurationClass() { return null; @@ -157,6 +161,7 @@ /** * {@inheritDoc} */ @Override public String getReplicationDBDirectory() { return dirName; @@ -165,6 +170,7 @@ /** * {@inheritDoc} */ @Override public int getReplicationPort() { return port; @@ -173,6 +179,7 @@ /** * {@inheritDoc} */ @Override public long getReplicationPurgeDelay() { return purgeDelay; @@ -181,6 +188,7 @@ /** * {@inheritDoc} */ @Override public SortedSet<String> getReplicationServer() { return servers; @@ -189,6 +197,7 @@ /** * {@inheritDoc} */ @Override public int getReplicationServerId() { return serverId; @@ -197,6 +206,7 @@ /** * {@inheritDoc} */ @Override public int getQueueSize() { return queueSize; @@ -205,6 +215,7 @@ /** * {@inheritDoc} */ @Override public int getWindowSize() { return windowSize; @@ -213,15 +224,17 @@ /** * {@inheritDoc} */ @Override public void removeChangeListener( ConfigurationChangeListener<ReplicationServerCfg> listener) { // not supported } /** * {@inheritDoc} */ @Override public DN dn() { return null; @@ -234,16 +247,19 @@ return null; } @Override public int getGroupId() { return groupId; } @Override public long getAssuredTimeout() { return assuredTimeout; } @Override public int getDegradedStatusThreshold() { return degradedStatusThreshold; @@ -254,22 +270,31 @@ this.degradedStatusThreshold = degradedStatusThreshold; } @Override public int getWeight() { return weight; } @Override public long getMonitoringPeriod() { return monitoringPeriod; } /** * @param monitoringPeriod the monitoringPeriod to set */ public void setMonitoringPeriod(long monitoringPeriod) { this.monitoringPeriod = monitoringPeriod; } @Override public boolean isComputeChangenumber() { return computeChangenumber; } public void setComputeChangenumber(boolean computeChangenumber) { this.computeChangenumber = computeChangenumber; } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -402,6 +402,11 @@ { final ReplicatedUpdateMsg msg = msgs[i]; final ChangeNumberIndexRecord record = allValues.get(i); if (previousCookie.isEmpty()) { // ugly hack to go round strange legacy code previousCookie.replace(record.getBaseDN(), new ServerState()); } // check content in order String desc2 = "actual was:<" + record + ">, but expected was:<" + msg + ">"; assertThat(record.getBaseDN()).as(desc2).isEqualTo(msg.getBaseDN());