opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -41,9 +41,9 @@ import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.protocol.*; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogDBIterator; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.je.DraftCNDbHandler; import org.opends.server.replication.server.changelog.je.DraftCNDbIterator; import org.opends.server.types.*; import org.opends.server.util.ServerConstants; @@ -66,8 +66,8 @@ */ private String operationId; /** Iterator on the draftCN database. */ private DraftCNDbIterator draftCNDbIter = null; /** Iterator on the changelogDB database. */ private ChangelogDBIterator changelogDBIter = null; private boolean draftCompat = false; /** @@ -99,8 +99,7 @@ * currently processed (thus becoming the "current" cookie just * before the change is returned. */ private MultiDomainServerState previousCookie = new MultiDomainServerState(); private MultiDomainServerState previousCookie = new MultiDomainServerState(); /** * Specifies the excluded DNs (like cn=admin, ...). */ @@ -563,16 +562,16 @@ * @throws DirectoryException * if a database problem occurred */ private String findCookie(int startDraftCN) throws ChangelogException, private String findCookie(final int startDraftCN) throws ChangelogException, DirectoryException { DraftCNDbHandler draftCNDb = replicationServer.getDraftCNDbHandler(); ChangelogDB changelogDB = replicationServer.getChangelogDB(); if (startDraftCN <= 1) { // Request filter DOES NOT contain any firstDraftCN // So we'll generate from the first DraftCN in the DraftCNdb if (draftCNDb.isEmpty()) if (changelogDB.isEmpty()) { // FIXME JNR if we find a way to make draftCNDb.isEmpty() a non costly // operation, then I think we can move this check to the top of this @@ -581,21 +580,20 @@ return null; } final int firstKey = draftCNDb.getFirstKey(); String crossDomainStartState = draftCNDb.getValue(firstKey); draftCNDbIter = draftCNDb.generateIterator(firstKey); final int firstKey = changelogDB.getFirstKey(); String crossDomainStartState = changelogDB.getPreviousCookie(firstKey); changelogDBIter = changelogDB.generateIterator(firstKey); return crossDomainStartState; } // Request filter DOES contain a startDraftCN // Read the draftCNDb to see whether it contains startDraftCN final int startDraftCNKey = startDraftCN; String crossDomainStartState = draftCNDb.getValue(startDraftCNKey); String crossDomainStartState = changelogDB.getPreviousCookie(startDraftCN); if (crossDomainStartState != null) { // found the provided startDraftCN, let's return it draftCNDbIter = draftCNDb.generateIterator(startDraftCNKey); changelogDBIter = changelogDB.generateIterator(startDraftCN); return crossDomainStartState; } @@ -614,10 +612,10 @@ // the DB, let's use the lower limit. if (startDraftCN < firstDraftCN) { crossDomainStartState = draftCNDb.getValue(firstDraftCN); crossDomainStartState = changelogDB.getPreviousCookie(firstDraftCN); if (crossDomainStartState != null) { draftCNDbIter = draftCNDb.generateIterator(firstDraftCN); changelogDBIter = changelogDB.generateIterator(firstDraftCN); return crossDomainStartState; } @@ -629,15 +627,15 @@ { // startDraftCN is between first and potential last and has never // been returned yet if (draftCNDb.isEmpty()) if (changelogDB.isEmpty()) { isEndOfDraftCNReached = true; return null; } final int lastKey = draftCNDb.getLastKey(); crossDomainStartState = draftCNDb.getValue(lastKey); draftCNDbIter = draftCNDb.generateIterator(lastKey); final int lastKey = changelogDB.getLastKey(); crossDomainStartState = changelogDB.getPreviousCookie(lastKey); changelogDBIter = changelogDB.generateIterator(lastKey); return crossDomainStartState; // TODO:ECL ... ok we'll start from the end of the draftCNDb BUT ... @@ -895,7 +893,7 @@ public void shutdown() { if (debugEnabled()) TRACER.debugInfo(this + " shutdown()" + draftCNDbIter); TRACER.debugInfo(this + " shutdown()"); releaseIterator(); for (DomainContext domainCtxt : domainCtxts) { if (!domainCtxt.unRegisterHandler()) { @@ -911,10 +909,10 @@ private void releaseIterator() { if (this.draftCNDbIter != null) if (this.changelogDBIter != null) { this.draftCNDbIter.releaseCursor(); this.draftCNDbIter = null; this.changelogDBIter.close(); this.changelogDBIter = null; } } @@ -1374,8 +1372,8 @@ // the next change from the DraftCN db ChangeNumber cnFromDraftCNDb = draftCNDbIter.getChangeNumber(); String dnFromDraftCNDb = draftCNDbIter.getBaseDN(); ChangeNumber cnFromDraftCNDb = changelogDBIter.getChangeNumber(); String dnFromDraftCNDb = changelogDBIter.getBaseDN(); if (debugEnabled()) TRACER.debugInfo("getNextECLUpdate generating draftCN " @@ -1390,10 +1388,10 @@ { if (debugEnabled()) TRACER.debugInfo("getNextECLUpdate generating draftCN " + " assigning draftCN=" + draftCNDbIter.getDraftCN() + " assigning draftCN=" + changelogDBIter.getDraftCN() + " to change=" + oldestChange); oldestChange.setDraftChangeNumber(draftCNDbIter.getDraftCN()); oldestChange.setDraftChangeNumber(changelogDBIter.getDraftCN()); return true; } @@ -1422,12 +1420,12 @@ + " will skip " + cnFromDraftCNDb + " and read next change from the DraftCNDb."); isEndOfDraftCNReached = !draftCNDbIter.next(); isEndOfDraftCNReached = !changelogDBIter.next(); if (debugEnabled()) TRACER.debugInfo("getNextECLUpdate generating draftCN " + " has skipped to " + " sn=" + draftCNDbIter.getDraftCN() + " cn=" + draftCNDbIter.getChangeNumber() + " has skipped to " + " sn=" + changelogDBIter.getDraftCN() + " cn=" + changelogDBIter.getChangeNumber() + " End of draftCNDb ?" + isEndOfDraftCNReached); } catch (ChangelogException e) @@ -1456,10 +1454,10 @@ // generate a new draftCN and assign to this change change.setDraftChangeNumber(replicationServer.getNewDraftCN()); // store in DraftCNdb the pair // store in changelogDB the pair // (DraftCN of the current change, state before this change) DraftCNDbHandler draftCNDb = replicationServer.getDraftCNDbHandler(); draftCNDb.add( ChangelogDB changelogDB = replicationServer.getChangelogDB(); changelogDB.add( change.getDraftChangeNumber(), previousCookie.toString(), change.getBaseDN(), opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -52,6 +52,7 @@ import org.opends.server.replication.common.*; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.protocol.*; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.je.DbHandler; import org.opends.server.replication.server.changelog.je.DraftCNDbHandler; @@ -65,6 +66,7 @@ import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.types.ResultCode.*; import static org.opends.server.util.ServerConstants.*; import static org.opends.server.util.StaticUtils.*; @@ -143,23 +145,22 @@ private long monitoringPublisherPeriod = 3000; /** * The handler of the draft change numbers database, the database used to * store the relation between a draft change number ('seqnum') and the * associated cookie. * The handler of the changelog database, the database stores the relation * between a draft change number ('seqnum') and the associated cookie. * <p> * Guarded by draftCNLock * Guarded by changelogDBLock */ private DraftCNDbHandler draftCNDbHandler; private ChangelogDB changelogDB; /** * The last value generated of the draft change number. * <p> * Guarded by draftCNLock * Guarded by changelogDBLock **/ private int lastGeneratedDraftCN = 0; /** Used for protecting draft CN related state. */ private final Object draftCNLock = new Object(); /** Used for protecting changelogDB related state. */ private final Object changelogDBLock = new Object(); /** * The tracer object for the debug logger. @@ -183,7 +184,7 @@ private long domainTicket = 0L; /** BaseDNs excluded for ECL. */ private Collection<String> excludedBaseDNs = new ArrayList<String>(); private Set<String> excludedBaseDNs = new HashSet<String>(); /** * The weight affected to the replication server. @@ -470,7 +471,7 @@ private Set<String> getConnectedRSUrls(ReplicationServerDomain domain) { Set<String> results = new LinkedHashSet<String>(); Set<String> results = new HashSet<String>(); for (ReplicationServerHandler rsHandler : domain.getConnectedRSs().values()) { results.add(normalizeServerURL(rsHandler.getServerAddressURL())); @@ -714,11 +715,11 @@ eclwe.finalizeWorkflowElement(); } synchronized (draftCNLock) synchronized (changelogDBLock) { if (draftCNDbHandler != null) if (changelogDB != null) { draftCNDbHandler.shutdown(); changelogDB.shutdown(); } } } @@ -900,42 +901,39 @@ { dbEnv.clearGenerationId(baseDn); } catch (Exception e) catch (Exception ignored) { // Ignore. if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.WARNING, e); TRACER.debugCaught(DebugLogLevel.WARNING, ignored); } } synchronized (draftCNLock) synchronized (changelogDBLock) { if (draftCNDbHandler != null) if (changelogDB != null) { try { draftCNDbHandler.clear(baseDn); changelogDB.clear(baseDn); } catch (Exception e) catch (Exception ignored) { // Ignore. if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.WARNING, e); TRACER.debugCaught(DebugLogLevel.WARNING, ignored); } } try { lastGeneratedDraftCN = draftCNDbHandler.getLastKey(); lastGeneratedDraftCN = changelogDB.getLastKey(); } catch (Exception e) catch (Exception ignored) { // Ignore. if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.WARNING, e); TRACER.debugCaught(DebugLogLevel.WARNING, ignored); } } } @@ -1352,12 +1350,10 @@ public void processExportBegin(Backend backend, LDIFExportConfig config) { if (debugEnabled()) TRACER.debugInfo("RS " +getMonitorInstanceName()+ " Export starts"); TRACER.debugInfo("RS " + getMonitorInstanceName() + " Export starts"); if (backend.getBackendID().equals(backendId)) { // Retrieves the backend related to this replicationServerDomain // backend = ReplicationBackend b = (ReplicationBackend)DirectoryServer.getBackend(backendId); b.setServer(this); @@ -1394,38 +1390,36 @@ rsd.clearDbs(); } synchronized (draftCNLock) synchronized (changelogDBLock) { if (draftCNDbHandler != null) if (changelogDB != null) { try { draftCNDbHandler.clear(); changelogDB.clear(); } catch (Exception e) catch (Exception ignored) { // Ignore. if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.WARNING, e); TRACER.debugCaught(DebugLogLevel.WARNING, ignored); } } try { draftCNDbHandler.shutdown(); changelogDB.shutdown(); } catch (Exception e) catch (Exception ignored) { // Ignore. if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.WARNING, e); TRACER.debugCaught(DebugLogLevel.WARNING, ignored); } } lastGeneratedDraftCN = 0; draftCNDbHandler = null; changelogDB = null; } } } @@ -1614,67 +1608,70 @@ ChangeNumber eligibleCN = null; for (ReplicationServerDomain domain : getReplicationServerDomains()) { if ((excludedBaseDNs != null) && excludedBaseDNs.contains(domain.getBaseDn())) if (contains(excludedBaseDNs, domain.getBaseDn())) continue; ChangeNumber domainEligibleCN = domain.getEligibleCN(); String dates = ""; if (domainEligibleCN != null) { if ((eligibleCN == null) || (domainEligibleCN.older(eligibleCN))) final ChangeNumber domainEligibleCN = domain.getEligibleCN(); if (eligibleCN == null || (domainEligibleCN != null && domainEligibleCN.older(eligibleCN))) { eligibleCN = domainEligibleCN; } dates = new Date(domainEligibleCN.getTime()).toString(); } debugLog += "[dn=" + domain.getBaseDn() if (debugEnabled()) { final String dates = domainEligibleCN == null ? "" : new Date(domainEligibleCN.getTime()).toString(); debugLog += "[baseDN=" + domain.getBaseDn() + "] [eligibleCN=" + domainEligibleCN + ", " + dates + "]"; } } if (eligibleCN==null) { eligibleCN = new ChangeNumber(TimeThread.getTime(), 0, 0); } if (debugEnabled()) if (debugEnabled()) { TRACER.debugInfo("In " + this + " getEligibleCN() ends with " + " the following domainEligibleCN for each domain :" + debugLog + " thus CrossDomainEligibleCN=" + eligibleCN + " ts=" + new Date(eligibleCN.getTime()).toString()); } return eligibleCN; } private boolean contains(Set<String> col, String elem) { return col != null && col.contains(elem); } /** * Get or create a handler on a Db on DraftCN for external changelog. * Get (or create) a handler on the ChangelogDB for external changelog. * * @return the handler. * @throws DirectoryException * when needed. */ public DraftCNDbHandler getDraftCNDbHandler() throws DirectoryException public ChangelogDB getChangelogDB() throws DirectoryException { synchronized (draftCNLock) synchronized (changelogDBLock) { try { if (draftCNDbHandler == null) if (changelogDB == null) { draftCNDbHandler = new DraftCNDbHandler(this, this.dbEnv); changelogDB = new DraftCNDbHandler(this, this.dbEnv); lastGeneratedDraftCN = getLastDraftChangeNumber(); } return draftCNDbHandler; return changelogDB; } catch (Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); MessageBuilder mb = new MessageBuilder(); mb.append(ERR_DRAFT_CHANGENUMBER_DATABASE.get("")); throw new DirectoryException(ResultCode.OPERATIONS_ERROR, mb.toMessage(), e); throw new DirectoryException(OPERATIONS_ERROR, mb.toMessage(), e); } } } @@ -1685,11 +1682,11 @@ */ public int getFirstDraftChangeNumber() { synchronized (draftCNLock) synchronized (changelogDBLock) { if (draftCNDbHandler != null) if (changelogDB != null) { return draftCNDbHandler.getFirstKey(); return changelogDB.getFirstKey(); } return 0; } @@ -1701,11 +1698,11 @@ */ public int getLastDraftChangeNumber() { synchronized (draftCNLock) synchronized (changelogDBLock) { if (draftCNDbHandler != null) if (changelogDB != null) { return draftCNDbHandler.getLastKey(); return changelogDB.getLastKey(); } return 0; } @@ -1717,7 +1714,7 @@ */ public int getNewDraftCN() { synchronized (draftCNLock) synchronized (changelogDBLock) { return ++lastGeneratedDraftCN; } @@ -1756,12 +1753,11 @@ */ int lastDraftCN; Boolean dbEmpty = false; Long newestDate = 0L; DraftCNDbHandler draftCNDbH = getDraftCNDbHandler(); boolean dbEmpty = false; long newestDate = 0L; ChangelogDB changelogDB = getChangelogDB(); // Get the first DraftCN from the DraftCNdb int firstDraftCN = draftCNDbH.getFirstKey(); int firstDraftCN = changelogDB.getFirstKey(); Map<String,ServerState> domainsServerStateForLastSeqnum = null; ChangeNumber changeNumberForLastSeqnum = null; String domainForLastSeqnum = null; @@ -1773,12 +1769,11 @@ } else { // Get the last DraftCN from the DraftCNdb lastDraftCN = draftCNDbH.getLastKey(); lastDraftCN = changelogDB.getLastKey(); // Get the generalized state associated with the current last DraftCN // and initializes from it the startStates table String lastSeqnumGenState = draftCNDbH.getValue(lastDraftCN); String lastSeqnumGenState = changelogDB.getPreviousCookie(lastDraftCN); if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0)) { domainsServerStateForLastSeqnum = MultiDomainServerState. @@ -1786,16 +1781,16 @@ } // Get the changeNumber associated with the current last DraftCN changeNumberForLastSeqnum = draftCNDbH.getChangeNumber(lastDraftCN); changeNumberForLastSeqnum = changelogDB.getChangeNumber(lastDraftCN); // Get the domain associated with the current last DraftCN domainForLastSeqnum = draftCNDbH.getBaseDN(lastDraftCN); domainForLastSeqnum = changelogDB.getBaseDN(lastDraftCN); } // Domain by domain for (ReplicationServerDomain rsd : getReplicationServerDomains()) { if (excludedBaseDNs.contains(rsd.getBaseDn())) if (contains(excludedBaseDNs, rsd.getBaseDn())) continue; // for this domain, have the state in the replchangelog @@ -1860,15 +1855,12 @@ { disableEligibility(excludedBaseDNs); MultiDomainServerState result = new MultiDomainServerState(); // Initialize start state for all running domains with empty state MultiDomainServerState result = new MultiDomainServerState(); for (ReplicationServerDomain rsd : getReplicationServerDomains()) { if ((excludedBaseDNs != null) && (excludedBaseDNs.contains(rsd.getBaseDn()))) continue; if (rsd.getDbServerState().isEmpty()) if (contains(excludedBaseDNs, rsd.getBaseDn()) || rsd.getDbServerState().isEmpty()) continue; result.update(rsd.getBaseDn(), rsd.getEligibleState(getEligibleCN())); opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
New file @@ -0,0 +1,150 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013 ForgeRock AS */ package org.opends.server.replication.server.changelog.api; import org.opends.server.replication.common.ChangeNumber; /** * This class stores the changelog information into a database. * * @see <a href= * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names" * >OpenDJ Domain Names</a> for more information about the changelog. */ public interface ChangelogDB extends Runnable { /** * Get the CN associated to a provided draft change number. * * @param draftCN * the provided draft change number. * @return the associated CN, null when none. */ public ChangeNumber getChangeNumber(int draftCN); /** * Get the baseDN associated to a provided draft change number. * * @param draftCN * the provided draft change number. * @return the baseDN, null when none. */ public String getBaseDN(int draftCN); /** * Get the previous cookie associated to a provided draft change number. * * @param draftCN * the provided draft change number. * @return the previous cookie, null when none. */ String getPreviousCookie(int draftCN); /** * Get the firstChange. * * @return Returns the first draftCN in the DB. */ int getFirstKey(); /** * Get the lastChange. * * @return Returns the last draftCN in the DB */ int getLastKey(); /** * Add an update to the list of messages that must be saved to the db managed * by this db handler. * <p> * This method is blocking if the size of the list of message is larger than * its maximum. * * @param draftCN * The draft change number for this record in the db. * @param previousCookie * The value of the previous cookie. * @param baseDN * The associated baseDN. * @param changeNumber * The associated replication change number. */ void add(int draftCN, String previousCookie, String baseDN, ChangeNumber changeNumber); /** * Generate a new {@link ChangelogDBIterator} that allows to browse the db * managed by this dbHandler and starting at the position defined by a given * changeNumber. * * @param startDraftCN * The position where the iterator must start. * @return a new ReplicationIterator that allows to browse the db managed by * this dbHandler and starting at the position defined by a given * changeNumber. * @throws ChangelogException * if a database problem happened. */ ChangelogDBIterator generateIterator(int startDraftCN) throws ChangelogException; /** * Returns whether this database is empty. * * @return <code>true</code> if this database is empty, <code>false</code> * otherwise */ boolean isEmpty(); /** * Clear the changes from this DB (from both memory cache and DB storage). * * @throws ChangelogException * When an exception occurs while removing the changes from the DB. */ void clear() throws ChangelogException; /** * Clear the changes from this DB (from both memory cache and DB storage) for * the provided baseDN. * * @param baseDNToClear * The baseDN for which we want to remove all records from the * DraftCNDb - null means all. * @throws ChangelogException * When an exception occurs while removing the changes from the DB. */ void clear(String baseDNToClear) throws ChangelogException; /** * Shutdown this dbHandler. */ void shutdown(); } opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDBIterator.java
New file @@ -0,0 +1,78 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013 ForgeRock AS */ package org.opends.server.replication.server.changelog.api; import java.io.Closeable; import org.opends.server.replication.common.ChangeNumber; /** * Iterator into the changelog database. Once it is not used anymore, a * ChangelogDBIterator must be closed to release all the resources into the * database. */ public interface ChangelogDBIterator extends Closeable { /** * Getter for the replication change number field. * * @return The replication change number field. */ ChangeNumber getChangeNumber(); /** * Getter for the baseDN field. * * @return The service ID. */ String getBaseDN(); /** * Getter for the draftCN field. * * @return The draft CN field. */ int getDraftCN(); /** * Skip to the next record of the database. * * @return true if has next, false otherwise * @throws ChangelogException * When database exception raised. */ boolean next() throws ChangelogException; /** * Release the resources and locks used by this Iterator. This method must be * called when the iterator is no longer used. Failure to do it could cause DB * deadlock. */ @Override void close(); } opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
@@ -35,7 +35,6 @@ import org.opends.messages.MessageBuilder; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.types.DebugLogLevel; @@ -61,7 +60,6 @@ private Database db = null; private ReplicationDbEnv dbenv = null; private ReplicationServer replicationServer; /** * The lock used to provide exclusive access to the thread that close the db @@ -72,15 +70,12 @@ /** * Creates a new database or open existing database that will be used * to store and retrieve changes from an LDAP server. * @param replicationServer The ReplicationServer that needs to be shutdown. * @param dbenv The Db environment to use to create the db. * @throws ChangelogException If a database problem happened. */ public DraftCNDB(ReplicationServer replicationServer, ReplicationDbEnv dbenv) throws ChangelogException public DraftCNDB(ReplicationDbEnv dbenv) throws ChangelogException { this.dbenv = dbenv; this.replicationServer = replicationServer; // Get or create the associated ReplicationServerDomain and Db. db = dbenv.getOrCreateDraftCNDb(); opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
@@ -45,8 +45,9 @@ import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.ReplicationServerDomain; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.je.DraftCNDB .DraftCNDBCursor; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogDBIterator; import org.opends.server.replication.server.changelog.je.DraftCNDB.*; import org.opends.server.types.Attribute; import org.opends.server.types.Attributes; import org.opends.server.types.InitializationException; @@ -61,12 +62,13 @@ * server in the topology. * It is responsible for efficiently saving the updates that is received from * each master server into stable storage. * This class is also able to generate a ReplicationIterator that can be * This class is also able to generate a ChangelogDBIterator that can be * used to read all changes from a given ChangeNumber. * * This class publish some monitoring information below cn=monitor. * <p> * This class publishes some monitoring information below <code> * cn=monitor</code>. */ public class DraftCNDbHandler implements Runnable public class DraftCNDbHandler implements ChangelogDB { /** * The tracer object for the debug logger. @@ -114,7 +116,7 @@ this.trimAge = replicationServer.getTrimAge(); // DB initialization db = new DraftCNDB(replicationServer, dbenv); db = new DraftCNDB(dbenv); firstkey = db.readFirstDraftCN(); lastkey = db.readLastDraftCN(); @@ -127,43 +129,31 @@ DirectoryServer.registerMonitorProvider(dbMonitor); } /** * Add an update to the list of messages that must be saved to the db * managed by this db handler. * This method is blocking if the size of the list of message is larger * than its maximum. * @param key The key for this record in the db. * @param value The associated value. * @param baseDN The associated baseDN. * @param cn The associated replication change number. */ public synchronized void add(int key, String value, String baseDN, /** {@inheritDoc} */ @Override public synchronized void add(int draftCN, String value, String baseDN, ChangeNumber cn) { db.addEntry(key, value, baseDN, cn); db.addEntry(draftCN, value, baseDN, cn); if (debugEnabled()) TRACER.debugInfo( "In DraftCNDbhandler.add, added: " + " key=" + key + " key=" + draftCN + " value=" + value + " baseDN=" + baseDN + " cn=" + cn); } /** * Get the firstChange. * @return Returns the firstChange. */ /** {@inheritDoc} */ @Override public int getFirstKey() { return db.readFirstDraftCN(); } /** * Get the lastChange. * @return Returns the lastChange. */ /** {@inheritDoc} */ @Override public int getLastKey() { return db.readLastDraftCN(); @@ -179,7 +169,7 @@ } /** * Returns whether this database is empty. * {@inheritDoc} * <p> * FIXME Find a way to implement this method in a more efficient manner. * {@link com.sleepycat.je.Database#count()} javadoc mentions: @@ -193,64 +183,40 @@ * </li> * <li>call <code>db.readFirstDraftCN() != 0</code></li> * </ul> * * @return <code>true</code> if this database is empty, <code>false</code> * otherwise */ @Override public boolean isEmpty() { return count() == 0; } /** * Get a read cursor on the database from a provided key. * The cursor MUST be released after use. * @param key The provided key. * Get a read cursor on the database from a provided key. The cursor MUST be * closed after use. * <p> * This method is only used by unit tests. * * @param startingDraftCN * The draft change number from where to start. * @return the new cursor. * @throws ChangelogException * if a database problem occurs. */ public DraftCNDBCursor getReadCursor(int key) DraftCNDBCursor getReadCursor(int startingDraftCN) throws ChangelogException { try { return db.openReadCursor(key); } catch(Exception e) { return null; } return db.openReadCursor(startingDraftCN); } /** * Release a provided read cursor. * @param cursor The provided read cursor. */ public void releaseReadCursor(DraftCNDBCursor cursor) { close(cursor); } /** * Generate a new ReplicationIterator that allows to browse the db * managed by this dbHandler and starting at the position defined * by a given changeNumber. * * @param startDraftCN The position where the iterator must start. * * @return a new ReplicationIterator that allows to browse the db * managed by this dbHandler and starting at the position defined * by a given changeNumber. * * @throws ChangelogException if a database problem happened. */ public DraftCNDbIterator generateIterator(int startDraftCN) /** {@inheritDoc} */ @Override public ChangelogDBIterator generateIterator(int startDraftCN) throws ChangelogException { return new DraftCNDbIterator(db, startDraftCN); } /** * Shutdown this dbHandler. */ /** {@inheritDoc} */ @Override public void shutdown() { if (shutdown) @@ -334,14 +300,8 @@ clear(null); } /** * Clear the changes from this DB (from both memory cache and DB storage) * for the provided baseDN. * @param baseDNToClear The baseDN for which we want to remove * all records from the DraftCNDb - null means all. * @throws ChangelogException When an exception occurs while removing the * changes from the DB. */ /** {@inheritDoc} */ @Override public void clear(String baseDNToClear) throws ChangelogException { if (isEmpty()) @@ -365,11 +325,9 @@ return; } ChangeNumber cn = cursor.currentChangeNumber(); // From the draftCNDb change record, get the domain and changeNumber ChangeNumber cn = cursor.currentChangeNumber(); String baseDN = cursor.currentBaseDN(); if ((baseDNToClear != null) && (baseDNToClear.equalsIgnoreCase(baseDN))) { @@ -379,7 +337,6 @@ ReplicationServerDomain domain = replicationServer .getReplicationServerDomain(baseDN, false); if (domain == null) { // the domain has been removed since the record was written in the @@ -391,8 +348,7 @@ ServerState startState = domain.getStartState(); // We don't use the returned endState but it's updating CN as // reading // We don't use the returned endState but it's updating CN as reading domain.getEligibleState(crossDomainEligibleCN); ChangeNumber fcn = startState.getChangeNumber(cn.getServerId()); @@ -521,11 +477,8 @@ trimAge = delay; } /** * Clear the changes from this DB (from both memory cache and DB storage). * @throws ChangelogException When an exception occurs while removing the * changes from the DB. */ /** {@inheritDoc} */ @Override public void clear() throws ChangelogException { db.clear(); @@ -541,7 +494,7 @@ */ public boolean hasLock() { return (lock.getHoldCount() > 0); return lock.getHoldCount() > 0; } /** @@ -561,28 +514,19 @@ lock.unlock(); } /** * Get the value associated to a provided key. * @param key the provided key. * @return the associated value, null when none. */ public String getValue(int key) /** {@inheritDoc} */ @Override public String getPreviousCookie(int draftCN) { DraftCNDBCursor draftCNDBCursor = null; try { draftCNDBCursor = db.openReadCursor(key); draftCNDBCursor = db.openReadCursor(draftCN); return draftCNDBCursor.currentValue(); } catch(Exception e) { if (debugEnabled()) TRACER.debugInfo("In DraftCNDbHandler.getValue, read: " + " key=" + key + " value returned is null" + " first=" + db.readFirstDraftCN() + " last=" + db.readLastDraftCN() + " count=" + db.count() + " exception " + e + " " + e.getMessage()); debugException("getValue", draftCN, e); return null; } finally @@ -591,28 +535,19 @@ } } /** * Get the CN associated to a provided key. * @param key the provided key. * @return the associated CN, null when none. */ public ChangeNumber getChangeNumber(int key) /** {@inheritDoc} */ @Override public ChangeNumber getChangeNumber(int draftCN) { DraftCNDBCursor draftCNDBCursor = null; try { draftCNDBCursor = db.openReadCursor(key); draftCNDBCursor = db.openReadCursor(draftCN); return draftCNDBCursor.currentChangeNumber(); } catch(Exception e) { if (debugEnabled()) TRACER.debugInfo("In DraftCNDbHandler.getChangeNumber, read: " + " key=" + key + " changeNumber returned is null" + " first=" + db.readFirstDraftCN() + " last=" + db.readLastDraftCN() + " count=" + db.count() + " exception" + e + " " + e.getMessage()); debugException("getChangeNumber", draftCN, e); return null; } finally @@ -621,28 +556,19 @@ } } /** * Get the baseDN associated to a provided key. * @param key the provided key. * @return the baseDN, null when none. */ public String getBaseDN(int key) /**{@inheritDoc}*/ @Override public String getBaseDN(int draftCN) { DraftCNDBCursor draftCNDBCursor = null; try { draftCNDBCursor = db.openReadCursor(key); draftCNDBCursor = db.openReadCursor(draftCN); return draftCNDBCursor.currentBaseDN(); } catch(Exception e) { if (debugEnabled()) TRACER.debugInfo("In DraftCNDbHandler.getBaseDN(), read: " + " key=" + key + " baseDN returned is null" + " first=" + db.readFirstDraftCN() + " last=" + db.readLastDraftCN() + " count=" + db.count() + " exception" + e + " " + e.getMessage()); debugException("getBaseDN", draftCN, e); return null; } finally @@ -650,4 +576,15 @@ close(draftCNDBCursor); } } private void debugException(String methodName, int draftCN, Exception e) { if (debugEnabled()) TRACER.debugInfo("In DraftCNDbHandler." + methodName + "(), read: " + " key=" + draftCN + " value returned is null" + " first="+ db.readFirstDraftCN() + " last=" + db.readLastDraftCN() + " count=" + db.count() + " exception " + e + " " + e.getMessage()); } } opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbIterator.java
@@ -31,6 +31,7 @@ import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ChangelogDBIterator; import org.opends.server.replication.server.changelog.je.DraftCNDB .DraftCNDBCursor; import org.opends.server.types.DebugLogLevel; @@ -41,7 +42,7 @@ * This class allows to iterate through the changes received from a given * LDAP Server Identifier. */ public class DraftCNDbIterator public class DraftCNDbIterator implements ChangelogDBIterator { private static final DebugTracer TRACER = getTracer(); private DraftCNDBCursor draftCNDbCursor; @@ -66,11 +67,8 @@ } } /** * Getter for the baseDN field. * * @return The service ID. */ /** {@inheritDoc} */ @Override public String getBaseDN() { try @@ -84,10 +82,8 @@ } } /** * Getter for the replication change number field. * @return The replication change number field. */ /** {@inheritDoc} */ @Override public ChangeNumber getChangeNumber() { try @@ -101,10 +97,8 @@ } } /** * Getter for the draftCN field. * @return The draft CN field. */ /** {@inheritDoc} */ @Override public int getDraftCN() { ReplicationDraftCNKey sk = (ReplicationDraftCNKey) draftCNDbCursor.getKey(); @@ -112,11 +106,8 @@ return currentSeqnum; } /** * Skip to the next record of the database. * @return true if has next, false elsewhere * @throws ChangelogException When database exception raised. */ /** {@inheritDoc} */ @Override public boolean next() throws ChangelogException { if (draftCNDbCursor != null) @@ -126,12 +117,9 @@ return false; } /** * Release the resources and locks used by this Iterator. * This method must be called when the iterator is no longer used. * Failure to do it could cause DB deadlock. */ public void releaseCursor() /** {@inheritDoc} */ @Override public void close() { synchronized (this) { @@ -151,6 +139,6 @@ @Override protected void finalize() { releaseCursor(); close(); } } opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -28,10 +28,6 @@ */ package org.opends.server; import static org.opends.server.util.ServerConstants.*; import static org.opends.server.util.StaticUtils.*; import static org.testng.Assert.*; import java.io.*; import java.net.*; import java.util.*; @@ -75,9 +71,12 @@ import org.opends.server.util.EmbeddedUtils; import org.opends.server.util.LDIFReader; import static org.opends.server.util.ServerConstants.*; import static org.opends.server.util.StaticUtils.*; import static org.testng.Assert.*; /** * This class defines some utility functions which can be used by test * cases. * This class defines some utility functions which can be used by test cases. */ @SuppressWarnings("javadoc") public final class TestCaseUtils { @@ -363,11 +362,8 @@ File snmpResourceDir = new File(buildRoot + File.separator + "src" + File.separator + "snmp" + File.separator + "resource"); File snmpConfigDir = new File(snmpResourceDir, "config"); File testSnmpResourceDir = new File (testConfigDir + File.separator + "snmp"); File testSnmpResourceDir = new File (testConfigDir + File.separator + "snmp"); if (Boolean.getBoolean(PROPERTY_COPY_CLASSES_TO_TEST_PKG)) { @@ -1045,6 +1041,11 @@ * If the directory could not be deleted. */ public static void deleteDirectory(File dir) throws IOException { if (dir == null || !dir.exists()) { return; } if (dir.isDirectory()) { // Recursively delete sub-directories and files. for (String child : dir.list()) { @@ -1548,14 +1549,9 @@ if (useAdminPort) { return LDAPModify.mainModify(adminArgs, false, null, null); } else { } return LDAPModify.mainModify(args, false, null, null); } } /** * Creates a temporary text file with the specified contents. It will be @@ -1565,8 +1561,7 @@ * * @throws Exception If an unexpected problem occurs. */ public static String createTempFile(String... lines) throws Exception public static String createTempFile(String... lines) throws Exception { File f = File.createTempFile("LDAPModifyTestCase", ".txt"); f.deleteOnExit(); @@ -1576,7 +1571,6 @@ { w.write(s + System.getProperty("line.separator")); } w.close(); return f.getAbsolutePath(); @@ -1594,8 +1588,9 @@ /** * Return a Map constructed via alternating key and value pairs. */ public static LinkedHashMap<String,String> makeMap(String... keyValuePairs) { LinkedHashMap<String,String> map = new LinkedHashMap<String,String>(); public static Map<String, String> makeMap(String... keyValuePairs) { Map<String, String> map = new LinkedHashMap<String, String>(); for (int i = 0; i < keyValuePairs.length; i += 2) { map.put(keyValuePairs[i], keyValuePairs[i+1]); } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -2755,7 +2755,7 @@ String tn = "ECLPurgeDraftCNDbAfterChangelogClear"; debugInfo(tn, "Starting test\n\n"); { DraftCNDbHandler draftdb = replicationServer.getDraftCNDbHandler(); DraftCNDbHandler draftdb = (DraftCNDbHandler) replicationServer.getChangelogDB(); assertEquals(draftdb.count(), 8); draftdb.setPurgeDelay(1000); @@ -2764,7 +2764,7 @@ // Expect changes purged from the changelog db to be sometimes // also purged from the DraftCNDb. while(draftdb.count()>0) while (!draftdb.isEmpty()) { debugInfo(tn, "draftdb.count="+draftdb.count()); sleep(200); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DbHandlerTest.java
@@ -83,10 +83,9 @@ replicationServer = configureReplicationServer(100); // create or clean a directory for the dbHandler String path = getReplicationDbPath(); testRoot = createDirectory(path); testRoot = createCleanDir(); dbEnv = new ReplicationDbEnv(path, replicationServer); dbEnv = new ReplicationDbEnv(testRoot.getPath(), replicationServer); handler = new DbHandler(1, TEST_ROOT_DN_STRING, replicationServer, dbEnv, 5000); ChangeNumberGenerator gen = new ChangeNumberGenerator( 1, 0); @@ -164,7 +163,6 @@ dbEnv.shutdown(); if (replicationServer != null) replicationServer.remove(); if (testRoot != null) TestCaseUtils.deleteDirectory(testRoot); } } @@ -178,21 +176,14 @@ return new ReplicationServer(conf); } private String getReplicationDbPath() private File createCleanDir() throws IOException { String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot + File.separator + "build"); return path + File.separator + "unit-tests" + File.separator + "dbHandler"; } private File createDirectory(String path) throws IOException { File testRoot = new File(path); if (testRoot.exists()) { path = path + File.separator + "unit-tests" + File.separator + "dbHandler"; final File testRoot = new File(path); TestCaseUtils.deleteDirectory(testRoot); } testRoot.mkdirs(); return testRoot; } @@ -262,11 +253,8 @@ replicationServer = configureReplicationServer(100); // create or clean a directory for the dbHandler String path = getReplicationDbPath(); testRoot = createDirectory(path); dbEnv = new ReplicationDbEnv(path, replicationServer); testRoot = createCleanDir(); dbEnv = new ReplicationDbEnv(testRoot.getPath(), replicationServer); handler = new DbHandler(1, TEST_ROOT_DN_STRING, replicationServer, dbEnv, 5000); // Creates changes added to the dbHandler @@ -299,7 +287,6 @@ dbEnv.shutdown(); if (replicationServer != null) replicationServer.remove(); if (testRoot != null) TestCaseUtils.deleteDirectory(testRoot); } } @@ -362,12 +349,8 @@ replicationServer = configureReplicationServer(100000); // create or clean a directory for the dbHandler String path = getReplicationDbPath(); testRoot = createDirectory(path); dbEnv = new ReplicationDbEnv(path, replicationServer); testRoot = createCleanDir(); dbEnv = new ReplicationDbEnv(testRoot.getPath(), replicationServer); handler = new DbHandler(1, TEST_ROOT_DN_STRING, replicationServer, dbEnv, 10); handler.setCounterWindowSize(counterWindow); @@ -543,7 +526,6 @@ dbEnv.shutdown(); if (replicationServer != null) replicationServer.remove(); if (testRoot != null) TestCaseUtils.deleteDirectory(testRoot); } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java
@@ -27,9 +27,8 @@ */ package org.opends.server.replication.server.changelog.je; import static org.testng.Assert.*; import java.io.File; import java.io.IOException; import org.opends.server.TestCaseUtils; import org.opends.server.replication.ReplicationTestCase; @@ -37,12 +36,14 @@ import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.je.DraftCNDbHandler; import org.opends.server.replication.server.changelog.je.DraftCNDbIterator; import org.opends.server.replication.server.changelog.je.ReplicationDbEnv; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ChangelogDBIterator; import org.opends.server.replication.server.changelog.je.DraftCNDB.DraftCNDBCursor; import org.opends.server.util.StaticUtils; import org.testng.annotations.Test; import static org.testng.Assert.*; /** * Test the DraftCNDbHandler class with 2 kinds of cleaning of the db : * - periodic trim @@ -80,19 +81,8 @@ 2, 0, 100, null); replicationServer = new ReplicationServer(conf); // create or clean a directory for the DraftCNDbHandler String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot + File.separator + "build"); path = path + File.separator + "unit-tests" + File.separator + "DraftCNDbHandler"; testRoot = new File(path); if (testRoot.exists()) { TestCaseUtils.deleteDirectory(testRoot); } testRoot.mkdirs(); dbEnv = new ReplicationDbEnv(path, replicationServer); testRoot = createCleanDir(); dbEnv = new ReplicationDbEnv(testRoot.getPath(), replicationServer); handler = new DraftCNDbHandler(replicationServer, dbEnv); handler.setPurgeDelay(0); @@ -149,7 +139,7 @@ } finally { handler.releaseReadCursor(dbc); StaticUtils.close(dbc); } handler.setPurgeDelay(100); @@ -171,13 +161,24 @@ dbEnv.shutdown(); if (replicationServer != null) replicationServer.remove(); if (testRoot != null) TestCaseUtils.deleteDirectory(testRoot); } } private File createCleanDir() throws IOException { String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot + File.separator + "build"); path = path + File.separator + "unit-tests" + File.separator + "DraftCNDbHandler"; final File testRoot = new File(path); TestCaseUtils.deleteDirectory(testRoot); testRoot.mkdirs(); return testRoot; } /** * This test makes basic operations of a DraftCNDb and explicitely call * This test makes basic operations of a DraftCNDb and explicitly calls * the clear() method instead of waiting for the periodic trim to clear * it. * - create the db @@ -205,25 +206,13 @@ 2, 0, 100, null); replicationServer = new ReplicationServer(conf); // create or clean a directory for the DraftCNDbHandler String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot + File.separator + "build"); path = path + File.separator + "unit-tests" + File.separator + "DraftCNDbHandler"; testRoot = new File(path); if (testRoot.exists()) { TestCaseUtils.deleteDirectory(testRoot); } testRoot.mkdirs(); dbEnv = new ReplicationDbEnv(path, replicationServer); testRoot = createCleanDir(); dbEnv = new ReplicationDbEnv(testRoot.getAbsolutePath(), replicationServer); handler = new DraftCNDbHandler(replicationServer, dbEnv); handler.setPurgeDelay(0); // assertTrue(handler.count()==0); assertTrue(handler.isEmpty()); // Prepare data to be stored in the db int sn1 = 3; @@ -255,57 +244,25 @@ assertEquals(handler.count(), 3, "Db count"); assertEquals(handler.getValue(sn1),value1); assertEquals(handler.getValue(sn2),value2); assertEquals(handler.getValue(sn3),value3); assertEquals(handler.getPreviousCookie(sn1),value1); assertEquals(handler.getPreviousCookie(sn2),value2); assertEquals(handler.getPreviousCookie(sn3),value3); DraftCNDbIterator it = handler.generateIterator(sn1); try { assertEquals(it.getDraftCN(), sn1); assertTrue(it.next()); assertEquals(it.getDraftCN(), sn2); assertTrue(it.next()); assertEquals(it.getDraftCN(), sn3); assertFalse(it.next()); } finally { it.releaseCursor(); } ChangelogDBIterator it = handler.generateIterator(sn1); assertIteratorReadsInOrder(it, sn1, sn2, sn3); it = handler.generateIterator(sn2); try { assertEquals(it.getDraftCN(), sn2); assertTrue(it.next()); assertEquals(it.getDraftCN(), sn3); assertFalse(it.next()); } finally { it.releaseCursor(); } assertIteratorReadsInOrder(it, sn2, sn3); it = handler.generateIterator(sn3); try { assertEquals(it.getDraftCN(), sn3); assertFalse(it.next()); } finally { it.releaseCursor(); } assertIteratorReadsInOrder(it, sn3); // Clear ... handler.clear(); // Check the db is cleared. assertEquals(handler.getFirstKey(), 0); assertEquals(handler.getLastKey(), 0); assertEquals(handler.count(), 0); } finally { if (handler != null) @@ -314,8 +271,25 @@ dbEnv.shutdown(); if (replicationServer != null) replicationServer.remove(); if (testRoot != null) TestCaseUtils.deleteDirectory(testRoot); } } private void assertIteratorReadsInOrder(ChangelogDBIterator it, int... sns) throws ChangelogException { try { for (int i = 0; i < sns.length; i++) { assertEquals(it.getDraftCN(), sns[i]); final boolean isNotLast = i + 1 < sns.length; assertEquals(it.next(), isNotLast); } } finally { it.close(); } } }