| | |
| | | "[draftCompat=" + draftCompat + |
| | | "] [persistent=" + isPersistent + |
| | | "] [startChangeNumber=" + lastChangeNumber + |
| | | "] [isEndOfDraftCNReached=" + isEndOfCNIndexDBReached + |
| | | "] [isEndOfCNIndexDBReached=" + isEndOfCNIndexDBReached + |
| | | "] [searchPhase=" + searchPhase + |
| | | "] [startCookie=" + startCookie + |
| | | "] [previousCookie=" + previousCookie + |
| | |
| | | if (startChangeNumber <= 1) |
| | | { |
| | | // Request filter DOES NOT contain any first change number |
| | | // So we'll generate from the first change number in the DraftCNdb |
| | | // So we'll generate from the first change number in the CNIndexDB |
| | | final CNIndexRecord firstCNRecord = cnIndexDB.getFirstRecord(); |
| | | if (firstCNRecord == null) |
| | | { // DB is empty or closed |
| | |
| | | |
| | | // Request filter DOES contain a startChangeNumber |
| | | |
| | | // Read the draftCNDb to see whether it contains startChangeNumber |
| | | // Read the CNIndexDB to see whether it contains startChangeNumber |
| | | CNIndexRecord startCNRecord = cnIndexDB.getRecord(startChangeNumber); |
| | | if (startCNRecord != null) |
| | | { |
| | |
| | | return crossDomainStartState; |
| | | } |
| | | |
| | | // startChangeNumber provided in the request IS NOT in the DraftCNDb |
| | | // startChangeNumber provided in the request IS NOT in the CNIndexDB |
| | | |
| | | /* |
| | | * Get the draftLimits (from the eligibleCSN got at the beginning of the |
| | | * operation) in order to have the first and possible last change number. |
| | | * Get the changeNumberLimits (from the eligibleCSN obtained at the start of |
| | | * this method) in order to have the first and last change numbers. |
| | | */ |
| | | final long[] limits = replicationServer.getECLChangeNumberLimits( |
| | | eligibleCSN, excludedBaseDNs); |
| | |
| | | cnIndexDBCursor = cnIndexDB.getCursorFrom(lastKey); |
| | | return crossDomainStartState; |
| | | |
| | | // TODO:ECL ... ok we'll start from the end of the draftCNDb BUT ... |
| | | // TODO:ECL ... ok we'll start from the end of the CNIndexDB BUT ... |
| | | // this may be very long. Work on perf improvement here. |
| | | } |
| | | |
| | |
| | | startStatesFromProvidedCookie.toString() ,sb.toString())); |
| | | } |
| | | |
| | | // the next record from the DraftCNdb should be the one |
| | | // the next record from the CNIndexDB should be the one |
| | | startCookie = providedCookie; |
| | | |
| | | // Initializes each and every domain with the next(first) eligible message |
| | |
| | | private boolean assignChangeNumber(final ECLUpdateMsg oldestChange) |
| | | throws ChangelogException |
| | | { |
| | | // We also need to check if the draftCNdb is consistent with |
| | | // We also need to check if the CNIndexDB is consistent with |
| | | // the changelogdb. |
| | | // if not, 2 potential reasons |
| | | // a/ : changelog has been purged (trim)let's traverse the draftCNDb |
| | | // a/ : changelog has been purged (trim)let's traverse the CNIndexDB |
| | | // b/ : changelog is late .. let's traverse the changelogDb |
| | | // The following loop allows to loop until being on the same cn |
| | | // in the 2 dbs |
| | |
| | | { |
| | | if (isEndOfCNIndexDBReached) |
| | | { |
| | | // we are at the end of the DraftCNdb in the append mode |
| | | // we are at the end of the CNIndexDB in the append mode |
| | | assignNewChangeNumberAndStore(oldestChange); |
| | | return true; |
| | | } |
| | |
| | | |
| | | // the next change from the CNIndexDB |
| | | final CNIndexRecord currentRecord = cnIndexDBCursor.getRecord(); |
| | | final CSN csnFromDraftCNDb = currentRecord.getCSN(); |
| | | final DN dnFromDraftCNDb = currentRecord.getBaseDN(); |
| | | final CSN csnFromCNIndexDB = currentRecord.getCSN(); |
| | | final DN dnFromCNIndexDB = currentRecord.getBaseDN(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber() generating change number " |
| | | + " comparing the 2 db DNs :" + dnFromChangelogDb + "?=" |
| | | + csnFromChangelogDb + " timestamps:" |
| | | + new Date(csnFromChangelogDb.getTime()) + " ?older" |
| | | + new Date(csnFromDraftCNDb.getTime())); |
| | | + new Date(csnFromCNIndexDB.getTime())); |
| | | |
| | | |
| | | if (areSameChange(csnFromChangelogDb, dnFromChangelogDb, |
| | | csnFromDraftCNDb, dnFromDraftCNDb)) |
| | | csnFromCNIndexDB, dnFromCNIndexDB)) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber() generating change number " |
| | |
| | | } |
| | | |
| | | |
| | | if (!csnFromDraftCNDb.older(csnFromChangelogDb)) |
| | | if (!csnFromCNIndexDB.older(csnFromChangelogDb)) |
| | | { |
| | | // the change from the changelogDb is older |
| | | // it should have been stored lately |
| | |
| | | } |
| | | |
| | | |
| | | // the change from the DraftCNDb is older |
| | | // the change from the CNIndexDB is older |
| | | // that means that the change has been purged from the |
| | | // changelogDb (and DraftCNdb not yet been trimmed) |
| | | // changelogDb (and CNIndexDB not yet been trimmed) |
| | | try |
| | | { |
| | | // let's traverse the DraftCNdb searching for the change |
| | | // let's traverse the CNIndexDB searching for the change |
| | | // found in the changelogDb. |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber() generating change number " |
| | | + " will skip " + csnFromDraftCNDb |
| | | + " will skip " + csnFromCNIndexDB |
| | | + " and read next change from the CNIndexDB."); |
| | | |
| | | isEndOfCNIndexDBReached = !cnIndexDBCursor.next(); |
| | |
| | | * large and therefore won't contain all the changes. Some changes may |
| | | * only be stored in the backing DB of the servers. |
| | | * The total size of the receive queue is calculated by doing the sum of |
| | | * the number of missing changes for every dbHandler. |
| | | * the number of missing changes for every replicaDB. |
| | | */ |
| | | ServerState latestState = replicationServerDomain.getLatestServerState(); |
| | | return ServerState.diffChanges(latestState, serverState); |
| | |
| | | // - in the ServerHandler for a given DS1, the stored state contains : |
| | | // -- the max CSN produced by DS1 |
| | | // -- the last CSN consumed by DS1 from DS2..n |
| | | // - in the RSdomain/dbHandler, the built-in state contains : |
| | | // - in the ReplicationDomainDB/ReplicaDB, the built-in state contains: |
| | | // -- the max CSN produced by each server |
| | | // So for a given DS connected we can take the state and the max from |
| | | // the DS/state. |
| | |
| | | public long[] getECLChangeNumberLimits(CSN crossDomainEligibleCSN, |
| | | Set<String> excludedBaseDNs) throws DirectoryException |
| | | { |
| | | /* The content of the DraftCNdb depends on the SEARCH operations done before |
| | | * requesting the change number. If no operations, DraftCNdb is empty. |
| | | /* The content of the CNIndexDB depends on the SEARCH operations done before |
| | | * requesting the change number. If no operations, CNIndexDB is empty. |
| | | * The limits we want to get are the "potential" limits if a request was |
| | | * done, the DraftCNdb is probably not complete to do that. |
| | | * done, the CNIndexDB is probably not complete to do that. |
| | | * |
| | | * The first change number is : |
| | | * - the first record from the DraftCNdb |
| | | * - if none because DraftCNdb empty, |
| | | * - the first record from the CNIndexDB |
| | | * - if none because CNIndexDB empty, |
| | | * then |
| | | * if no change in replchangelog then return 0 |
| | | * else return 1 (change number that WILL be returned to next search) |
| | | * |
| | | * The last change number is : |
| | | * - initialized with the last record from the DraftCNdb (0 if none) |
| | | * - initialized with the last record from the CNIndexDB (0 if none) |
| | | * and consider the genState associated |
| | | * - to the last change number, we add the count of updates in the |
| | | * replchangelog FROM that genState TO the crossDomainEligibleCSN |
| | |
| | | } |
| | | else |
| | | { |
| | | // There are records in the draftDB (so already returned to clients) |
| | | // There are records in the CNIndexDB (so already returned to clients) |
| | | // BUT |
| | | // There is nothing related to this domain in the last draft record |
| | | // There is nothing related to this domain in the last CNIndexRecord |
| | | // (may be this domain was disabled when this record was returned). |
| | | // In that case, are counted the changes from |
| | | // the date of the most recent change from this last draft record |
| | | // the date of the most recent change from this last CNIndexRecord |
| | | if (newestDate == 0) |
| | | { |
| | | newestDate = csnForLastCN.getTime(); |
| | |
| | | long res = 0; |
| | | for (CSN csn : getLatestServerState()) |
| | | { |
| | | int serverId = csn.getServerId(); |
| | | CSN lStartCSN = new CSN(startCSN.getTime(), startCSN.getSeqnum(), |
| | | serverId); |
| | | CSN lStartCSN = |
| | | new CSN(startCSN.getTime(), startCSN.getSeqnum(), csn.getServerId()); |
| | | res += getCount(lStartCSN, endCSN); |
| | | } |
| | | return res; |
| | |
| | | |
| | | /** |
| | | * Generate a new {@link ChangeNumberIndexDBCursor} that allows to browse the |
| | | * db managed by this DBHandler and starting at the position defined by a |
| | | * given changeNumber. |
| | | * db managed by this object and starting at the position defined by a given |
| | | * changeNumber. |
| | | * |
| | | * @param startChangeNumber |
| | | * The position where the iterator must start. |
| | | * @return a new ReplicationIterator that allows to browse this DB managed by |
| | | * this DBHandler and starting at the position defined by a given |
| | | * this object and starting at the position defined by a given |
| | | * changeNumber. |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | |
| | | * <ul> |
| | | * <li>a changelog of all the changes that happened on each server in the |
| | | * replication domain / suffix,</li> |
| | | * <li>a draft changelog,</li> |
| | | * <li>a changelog as defined by draft-good-ldap-changelog,</li> |
| | | * <li>a state database containing specific information about each serverId in |
| | | * the suffix, and in particular the generationId for each server.</li> |
| | | * </ul> |
| | |
| | | |
| | | /** |
| | | * This class implements the interface between the underlying database |
| | | * and the dbHandler class. |
| | | * and the {@link JEChangeNumberIndexDB} class. |
| | | * This is the only class that should have code using the BDB interfaces. |
| | | */ |
| | | public class DraftCNDB |
| | |
| | | this.dbenv = dbenv; |
| | | |
| | | // Get or create the associated ReplicationServerDomain and Db. |
| | | db = dbenv.getOrCreateDraftCNDb(); |
| | | db = dbenv.getOrCreateCNIndexDB(); |
| | | } |
| | | |
| | | /** |
| | |
| | | dbenv.clearDb(oldDb); |
| | | |
| | | // RE-create the db |
| | | db = dbenv.getOrCreateDraftCNDb(); |
| | | db = dbenv.getOrCreateCNIndexDB(); |
| | | } |
| | | catch(Exception e) |
| | | { |
| File was renamed from opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java |
| | |
| | | * This class publishes some monitoring information below <code> |
| | | * cn=monitor</code>. |
| | | */ |
| | | public class DraftCNDbHandler implements ChangeNumberIndexDB, Runnable |
| | | public class JEChangeNumberIndexDB implements ChangeNumberIndexDB, Runnable |
| | | { |
| | | /** |
| | | * The tracer object for the debug logger. |
| | |
| | | |
| | | |
| | | /** |
| | | * Creates a new dbHandler associated to a given LDAP server. |
| | | * Creates a new JEChangeNumberIndexDB associated to a given LDAP server. |
| | | * |
| | | * @param replicationServer The ReplicationServer that creates this dbHandler. |
| | | * @param replicationServer The ReplicationServer that creates this instance. |
| | | * @param dbenv the Database Env to use to create the ReplicationServer DB. |
| | | * server for this domain. |
| | | * @throws ChangelogException If a database problem happened |
| | | */ |
| | | public DraftCNDbHandler(ReplicationServer replicationServer, |
| | | public JEChangeNumberIndexDB(ReplicationServer replicationServer, |
| | | ReplicationDbEnv dbenv) throws ChangelogException |
| | | { |
| | | this.replicationServer = replicationServer; |
| | |
| | | new AtomicLong((lastRecord != null) ? lastRecord.getChangeNumber() : 0); |
| | | |
| | | // Trimming thread |
| | | thread = new DirectoryThread(this, "Replication DraftCN db"); |
| | | thread = |
| | | new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer"); |
| | | thread.start(); |
| | | |
| | | // Monitoring registration |
| | |
| | | db.addRecord(record); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In DraftCNDbhandler.add, added: " + record); |
| | | TRACER.debugInfo("In JEChangeNumberIndexDB.add, added: " + record); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | public ChangeNumberIndexDBCursor getCursorFrom(long startChangeNumber) |
| | | throws ChangelogException |
| | | { |
| | | return new DraftCNDbIterator(db, startChangeNumber); |
| | | return new JEChangeNumberIndexDBCursor(db, startChangeNumber); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | { |
| | | for (int j = 0; j < 50; j++) |
| | | { |
| | | // let's traverse the DraftCNDb |
| | | // let's traverse the CNIndexDB |
| | | if (!cursor.next()) |
| | | { |
| | | cursor.close(); |
| | | return; |
| | | } |
| | | |
| | | // From the draftCNDb change record, get the domain and CSN |
| | | final CNIndexRecord record = cursor.currentRecord(); |
| | | if (baseDNToClear != null && baseDNToClear.equals(record.getBaseDN())) |
| | | { |
| | |
| | | if (domain == null) |
| | | { |
| | | // the domain has been removed since the record was written in the |
| | | // draftCNDb, thus it makes no sense to keep the record in the |
| | | // draftCNDb. |
| | | // CNIndexDB, thus it makes no sense to keep this record in the DB. |
| | | cursor.delete(); |
| | | continue; |
| | | } |
| | |
| | | csnVector = csnStartStates.get(record.getBaseDN()); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("DraftCNDBHandler:clear() - ChangeVector:" |
| | | TRACER.debugInfo("JEChangeNumberIndexDB:clear() - ChangeVector:" |
| | | + csnVector + " -- StartState:" + startState); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // We couldn't parse the mdss from the DraftCNData Value |
| | | // We could not parse the MultiDomainServerState from the record |
| | | cursor.delete(); |
| | | continue; |
| | | } |
| | |
| | | { |
| | | cursor.delete(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("DraftCNDBHandler:clear() - deleted " + csn |
| | | TRACER.debugInfo("JEChangeNumberIndexDB:clear() - deleted " + csn |
| | | + "Not covering startState"); |
| | | continue; |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * This internal class is used to implement the Monitoring capabilities |
| | | * of the dbHandler. |
| | | * This internal class is used to implement the Monitoring capabilities of the |
| | | * JEChangeNumberIndexDB. |
| | | */ |
| | | private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg> |
| | | { |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "draftCNdb:" + " " + firstChangeNumber + " " + lastChangeNumber; |
| | | return "JEChangeNumberIndexDB: " + firstChangeNumber + " " |
| | | + lastChangeNumber; |
| | | } |
| | | |
| | | /** |
| File was renamed from opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbIterator.java |
| | |
| | | * This class allows to iterate through the changes received from a given |
| | | * LDAP Server Identifier. |
| | | */ |
| | | public class DraftCNDbIterator implements ChangeNumberIndexDBCursor |
| | | public class JEChangeNumberIndexDBCursor implements ChangeNumberIndexDBCursor |
| | | { |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | private DraftCNDBCursor draftCNDbCursor; |
| | |
| | | * @throws ChangelogException |
| | | * If a database problem happened. |
| | | */ |
| | | public DraftCNDbIterator(DraftCNDB db, long startChangeNumber) |
| | | public JEChangeNumberIndexDBCursor(DraftCNDB db, long startChangeNumber) |
| | | throws ChangelogException |
| | | { |
| | | draftCNDbCursor = db.openReadCursor(startChangeNumber); |
| | |
| | | /** |
| | | * This map contains the List of updates received from each LDAP server. |
| | | */ |
| | | private final Map<DN, Map<Integer, DbHandler>> sourceDbHandlers = |
| | | new ConcurrentHashMap<DN, Map<Integer, DbHandler>>(); |
| | | private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs = |
| | | new ConcurrentHashMap<DN, Map<Integer, JEReplicaDB>>(); |
| | | private ReplicationDbEnv dbEnv; |
| | | private final String dbDirectoryName; |
| | | private final File dbDirectory; |
| | |
| | | } |
| | | } |
| | | |
| | | private Map<Integer, DbHandler> getDomainMap(DN baseDN) |
| | | private Map<Integer, JEReplicaDB> getDomainMap(DN baseDN) |
| | | { |
| | | final Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDN); |
| | | final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); |
| | | if (domainMap != null) |
| | | { |
| | | return domainMap; |
| | |
| | | return Collections.emptyMap(); |
| | | } |
| | | |
| | | private DbHandler getDbHandler(DN baseDN, int serverId) |
| | | private JEReplicaDB getReplicaDB(DN baseDN, int serverId) |
| | | { |
| | | return getDomainMap(baseDN).get(serverId); |
| | | } |
| | |
| | | private void commission(DN baseDN, int serverId, ReplicationServer rs) |
| | | throws ChangelogException |
| | | { |
| | | getOrCreateDbHandler(baseDN, serverId, rs); |
| | | getOrCreateReplicaDB(baseDN, serverId, rs); |
| | | } |
| | | |
| | | /** |
| | | * Returns a DbHandler, possibly creating it. |
| | | * Returns a {@link JEReplicaDB}, possibly creating it. |
| | | * |
| | | * @param baseDN |
| | | * the baseDN for which to create a DbHandler |
| | | * the baseDN for which to create a ReplicaDB |
| | | * @param serverId |
| | | * the baseserverId for which to create a DbHandler |
| | | * the serverId for which to create a ReplicaDB |
| | | * @param rs |
| | | * the ReplicationServer |
| | | * @return a Pair with the DbHandler and a a boolean indicating if it has been |
| | | * created |
| | | * @return a Pair with the JEReplicaDB and a boolean indicating whether it had |
| | | * to be created |
| | | * @throws ChangelogException |
| | | * if a problem occurred with the database |
| | | */ |
| | | Pair<DbHandler, Boolean> getOrCreateDbHandler(DN baseDN, |
| | | Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN, |
| | | int serverId, ReplicationServer rs) throws ChangelogException |
| | | { |
| | | synchronized (sourceDbHandlers) |
| | | synchronized (domainToReplicaDBs) |
| | | { |
| | | Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDN); |
| | | Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); |
| | | if (domainMap == null) |
| | | { |
| | | domainMap = new ConcurrentHashMap<Integer, DbHandler>(); |
| | | sourceDbHandlers.put(baseDN, domainMap); |
| | | domainMap = new ConcurrentHashMap<Integer, JEReplicaDB>(); |
| | | domainToReplicaDBs.put(baseDN, domainMap); |
| | | } |
| | | |
| | | DbHandler dbHandler = domainMap.get(serverId); |
| | | if (dbHandler == null) |
| | | JEReplicaDB replicaDB = domainMap.get(serverId); |
| | | if (replicaDB == null) |
| | | { |
| | | dbHandler = |
| | | new DbHandler(serverId, baseDN, rs, dbEnv, rs.getQueueSize()); |
| | | domainMap.put(serverId, dbHandler); |
| | | return Pair.of(dbHandler, true); |
| | | replicaDB = |
| | | new JEReplicaDB(serverId, baseDN, rs, dbEnv, rs.getQueueSize()); |
| | | domainMap.put(serverId, replicaDB); |
| | | return Pair.of(replicaDB, true); |
| | | } |
| | | return Pair.of(dbHandler, false); |
| | | return Pair.of(replicaDB, false); |
| | | } |
| | | } |
| | | |
| | |
| | | // - then throw the first encountered exception |
| | | ChangelogException firstException = null; |
| | | |
| | | for (DN baseDN : this.sourceDbHandlers.keySet()) |
| | | for (DN baseDN : this.domainToReplicaDBs.keySet()) |
| | | { |
| | | removeDomain(baseDN); |
| | | } |
| | |
| | | @Override |
| | | public long getCount(DN baseDN, CSN from, CSN to) |
| | | { |
| | | DbHandler dbHandler = getDbHandler(baseDN, from.getServerId()); |
| | | if (dbHandler != null) |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, from.getServerId()); |
| | | if (replicaDB != null) |
| | | { |
| | | return dbHandler.getCount(from, to); |
| | | return replicaDB.getCount(from, to); |
| | | } |
| | | return 0; |
| | | } |
| | |
| | | public long getDomainChangesCount(DN baseDN) |
| | | { |
| | | long entryCount = 0; |
| | | for (DbHandler dbHandler : getDomainMap(baseDN).values()) |
| | | for (JEReplicaDB replicaDB : getDomainMap(baseDN).values()) |
| | | { |
| | | entryCount += dbHandler.getChangesCount(); |
| | | entryCount += replicaDB.getChangesCount(); |
| | | } |
| | | return entryCount; |
| | | } |
| | |
| | | @Override |
| | | public void shutdownDomain(DN baseDN) |
| | | { |
| | | shutdownDbHandlers(getDomainMap(baseDN)); |
| | | sourceDbHandlers.remove(baseDN); |
| | | shutdownReplicaDBs(getDomainMap(baseDN)); |
| | | domainToReplicaDBs.remove(baseDN); |
| | | } |
| | | |
| | | private void shutdownDbHandlers(Map<Integer, DbHandler> domainMap) |
| | | private void shutdownReplicaDBs(Map<Integer, JEReplicaDB> domainMap) |
| | | { |
| | | synchronized (domainMap) |
| | | { |
| | | for (DbHandler dbHandler : domainMap.values()) |
| | | for (JEReplicaDB replicaDB : domainMap.values()) |
| | | { |
| | | dbHandler.shutdown(); |
| | | replicaDB.shutdown(); |
| | | } |
| | | domainMap.clear(); |
| | | } |
| | |
| | | public ServerState getDomainOldestCSNs(DN baseDN) |
| | | { |
| | | final ServerState result = new ServerState(); |
| | | for (DbHandler dbHandler : getDomainMap(baseDN).values()) |
| | | for (JEReplicaDB replicaDB : getDomainMap(baseDN).values()) |
| | | { |
| | | result.update(dbHandler.getOldestCSN()); |
| | | result.update(replicaDB.getOldestCSN()); |
| | | } |
| | | return result; |
| | | } |
| | |
| | | public ServerState getDomainNewestCSNs(DN baseDN) |
| | | { |
| | | final ServerState result = new ServerState(); |
| | | for (DbHandler dbHandler : getDomainMap(baseDN).values()) |
| | | for (JEReplicaDB replicaDB : getDomainMap(baseDN).values()) |
| | | { |
| | | result.update(dbHandler.getNewestCSN()); |
| | | result.update(replicaDB.getNewestCSN()); |
| | | } |
| | | return result; |
| | | } |
| | |
| | | ChangelogException firstException = null; |
| | | |
| | | // 1- clear the replica DBs |
| | | final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN); |
| | | final Map<Integer, JEReplicaDB> domainMap = getDomainMap(baseDN); |
| | | synchronized (domainMap) |
| | | { |
| | | for (DbHandler dbHandler : domainMap.values()) |
| | | for (JEReplicaDB replicaDB : domainMap.values()) |
| | | { |
| | | try |
| | | { |
| | | dbHandler.clear(); |
| | | replicaDB.clear(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | firstException = e; |
| | | } |
| | | } |
| | | shutdownDbHandlers(domainMap); |
| | | sourceDbHandlers.remove(baseDN); |
| | | shutdownReplicaDBs(domainMap); |
| | | domainToReplicaDBs.remove(baseDN); |
| | | } |
| | | |
| | | // 2- clear the ChangeNumber index DB |
| | |
| | | @Override |
| | | public void setPurgeDelay(long delay) |
| | | { |
| | | for (Map<Integer, DbHandler> domainMap : sourceDbHandlers.values()) |
| | | for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values()) |
| | | { |
| | | for (DbHandler dbHandler : domainMap.values()) |
| | | for (JEReplicaDB replicaDB : domainMap.values()) |
| | | { |
| | | dbHandler.setPurgeDelay(delay); |
| | | replicaDB.setPurgeDelay(delay); |
| | | } |
| | | } |
| | | } |
| | |
| | | public long getDomainLatestTrimDate(DN baseDN) |
| | | { |
| | | long latest = 0; |
| | | for (DbHandler dbHandler : getDomainMap(baseDN).values()) |
| | | for (JEReplicaDB replicaDB : getDomainMap(baseDN).values()) |
| | | { |
| | | if (latest == 0 || latest < dbHandler.getLatestTrimDate()) |
| | | if (latest == 0 || latest < replicaDB.getLatestTrimDate()) |
| | | { |
| | | latest = dbHandler.getLatestTrimDate(); |
| | | latest = replicaDB.getLatestTrimDate(); |
| | | } |
| | | } |
| | | return latest; |
| | |
| | | @Override |
| | | public CSN getCSNAfter(DN baseDN, int serverId, CSN startAfterCSN) |
| | | { |
| | | final DbHandler dbHandler = getDbHandler(baseDN, serverId); |
| | | final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | |
| | | ReplicaDBCursor cursor = null; |
| | | try |
| | | { |
| | | cursor = dbHandler.generateCursorFrom(startAfterCSN); |
| | | cursor = replicaDB.generateCursorFrom(startAfterCSN); |
| | | if (cursor != null && cursor.getChange() != null) |
| | | { |
| | | return cursor.getChange().getCSN(); |
| | |
| | | { |
| | | try |
| | | { |
| | | cnIndexDB = new DraftCNDbHandler(replicationServer, this.dbEnv); |
| | | cnIndexDB = new JEChangeNumberIndexDB(replicationServer, this.dbEnv); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | public ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, |
| | | CSN startAfterCSN) |
| | | { |
| | | DbHandler dbHandler = getDbHandler(baseDN, serverId); |
| | | if (dbHandler != null) |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | try |
| | | { |
| | | ReplicaDBCursor cursor = dbHandler.generateCursorFrom(startAfterCSN); |
| | | ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN); |
| | | cursor.next(); |
| | | return cursor; |
| | | } |
| | |
| | | public boolean publishUpdateMsg(DN baseDN, int serverId, |
| | | UpdateMsg updateMsg) throws ChangelogException |
| | | { |
| | | final Pair<DbHandler, Boolean> pair = |
| | | getOrCreateDbHandler(baseDN, serverId, replicationServer); |
| | | final DbHandler dbHandler = pair.getFirst(); |
| | | final Pair<JEReplicaDB, Boolean> pair = |
| | | getOrCreateReplicaDB(baseDN, serverId, replicationServer); |
| | | final JEReplicaDB replicaDB = pair.getFirst(); |
| | | final boolean wasCreated = pair.getSecond(); |
| | | |
| | | dbHandler.add(updateMsg); |
| | | replicaDB.add(updateMsg); |
| | | return wasCreated; |
| | | } |
| | | |
| File was renamed from opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java |
| | |
| | | * |
| | | * This class publish some monitoring information below cn=monitor. |
| | | */ |
| | | public class DbHandler implements Runnable |
| | | public class JEReplicaDB implements Runnable |
| | | { |
| | | /** |
| | | * The msgQueue holds all the updates not yet saved to stable storage. |
| | |
| | | private long trimAge; |
| | | |
| | | /** |
| | | * Creates a new dbHandler associated to a given LDAP server. |
| | | * Creates a new ReplicaDB associated to a given LDAP server. |
| | | * |
| | | * @param id Identifier of the DB. |
| | | * @param baseDN the baseDN for which this DB was created. |
| | | * @param replicationServer The ReplicationServer that creates this dbHandler. |
| | | * @param replicationServer The ReplicationServer that creates this ReplicaDB. |
| | | * @param dbenv the Database Env to use to create the ReplicationServer DB. |
| | | * server for this domain. |
| | | * @param queueSize The queueSize to use when creating the dbHandler. |
| | | * @param queueSize The queueSize to use when creating the ReplicaDB. |
| | | * @throws ChangelogException If a database problem happened |
| | | */ |
| | | public DbHandler(int id, DN baseDN, ReplicationServer replicationServer, |
| | | public JEReplicaDB(int id, DN baseDN, ReplicationServer replicationServer, |
| | | ReplicationDbEnv dbenv, int queueSize) throws ChangelogException |
| | | { |
| | | this.replicationServer = replicationServer; |
| | |
| | | |
| | | /** |
| | | * Generate a new {@link ReplicaDBCursor} that allows to browse the db managed |
| | | * by this dbHandler and starting at the position defined by a given CSN. |
| | | * by this ReplicaDB and starting at the position defined by a given CSN. |
| | | * |
| | | * @param startAfterCSN |
| | | * The position where the cursor must start. If null, start from the |
| | | * oldest CSN |
| | | * @return a new {@link ReplicaDBCursor} that allows to browse the db managed |
| | | * by this dbHandler and starting at the position defined by a given |
| | | * by this ReplicaDB and starting at the position defined by a given |
| | | * CSN. |
| | | * @throws ChangelogException |
| | | * if a database problem happened. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this dbHandler. |
| | | * Shutdown this ReplicaDB. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * This internal class is used to implement the Monitoring capabilities |
| | | * of the dbHandler. |
| | | * This internal class is used to implement the Monitoring capabilities of the |
| | | * ReplicaDB. |
| | | */ |
| | | private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg> |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Return the size of the msgQueue (the memory cache of the DbHandler). |
| | | * Return the size of the msgQueue (the memory cache of the ReplicaDB). |
| | | * For test purpose. |
| | | * @return The memory queue size. |
| | | */ |
| | |
| | | { |
| | | private UpdateMsg currentChange; |
| | | private ReplServerDBCursor cursor; |
| | | private DbHandler dbHandler; |
| | | private JEReplicaDB replicaDB; |
| | | private ReplicationDB db; |
| | | private CSN lastNonNullCurrentCSN; |
| | | |
| | |
| | | * @param startAfterCSN |
| | | * The CSN after which the cursor must start.If null, start from the |
| | | * oldest CSN |
| | | * @param dbHandler |
| | | * The associated DbHandler. |
| | | * @param replicaDB |
| | | * The associated JEReplicaDB. |
| | | * @throws ChangelogException |
| | | * if a database problem happened. |
| | | */ |
| | | public JEReplicaDBCursor(ReplicationDB db, CSN startAfterCSN, |
| | | DbHandler dbHandler) throws ChangelogException |
| | | JEReplicaDB replicaDB) throws ChangelogException |
| | | { |
| | | this.db = db; |
| | | this.dbHandler = dbHandler; |
| | | this.replicaDB = replicaDB; |
| | | this.lastNonNullCurrentCSN = startAfterCSN; |
| | | |
| | | try |
| | |
| | | if (cursor == null) |
| | | { |
| | | // flush the queue into the db |
| | | dbHandler.flush(); |
| | | replicaDB.flush(); |
| | | |
| | | // look again in the db |
| | | cursor = db.openReadCursor(startAfterCSN); |
| | |
| | | cursor.close(); |
| | | cursor = null; |
| | | } |
| | | dbHandler.flush(); |
| | | replicaDB.flush(); |
| | | try |
| | | { |
| | | cursor = db.openReadCursor(lastNonNullCurrentCSN); |
| | |
| | | cursor.close(); |
| | | cursor = null; |
| | | } |
| | | this.dbHandler = null; |
| | | this.replicaDB = null; |
| | | this.db = null; |
| | | } |
| | | } |
| | |
| | | |
| | | /** |
| | | * This class implements the interface between the underlying database |
| | | * and the dbHandler class. |
| | | * and the JEReplicaDB class. |
| | | * This is the only class that should have code using the BDB interfaces. |
| | | */ |
| | | public class ReplicationDB |
| | |
| | | } |
| | | |
| | | /** |
| | | * Read the list of known servers from the database and start dbHandler for |
| | | * each of them. |
| | | * Read and return the list of known servers from the database. |
| | | * |
| | | * @return the {@link ChangelogState} read from the changelogState DB |
| | | * @throws ChangelogException |
| | |
| | | * @return the retrieved or created db. |
| | | * @throws ChangelogException when a problem occurs. |
| | | */ |
| | | public Database getOrCreateDraftCNDb() throws ChangelogException |
| | | public Database getOrCreateCNIndexDB() throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import java.io.*; |
| | | import java.io.BufferedReader; |
| | | import java.io.ByteArrayOutputStream; |
| | | import java.io.IOException; |
| | | import java.io.StringReader; |
| | | import java.net.Socket; |
| | | import java.util.*; |
| | | |
| | |
| | | import org.opends.server.replication.plugin.LDAPReplicationDomain; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.server.changelog.je.DraftCNDbHandler; |
| | | import org.opends.server.replication.server.changelog.je.JEChangeNumberIndexDB; |
| | | import org.opends.server.replication.service.ReplicationBroker; |
| | | import org.opends.server.tools.LDAPSearch; |
| | | import org.opends.server.tools.LDAPWriter; |
| | |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation; |
| | | import org.testng.Assert; |
| | | import org.testng.annotations.*; |
| | | import org.testng.annotations.AfterClass; |
| | | import org.testng.annotations.AfterMethod; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | |
| | | // search again the ECL, but search for first and last |
| | | ECLCompatTestLimitsAndAdd(1,8, ts); |
| | | |
| | | // Test DraftCNDb is purged when replication change log is purged |
| | | ECLPurgeDraftCNDbAfterChangelogClear(); |
| | | // Test CNIndexDB is purged when replication change log is purged |
| | | ECLPurgeCNIndexDBAfterChangelogClear(); |
| | | |
| | | // Test first and last are updated |
| | | ECLCompatTestLimits(0,0, true); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Put a short purge delay to the draftCNDB, clear the changelogDB, |
| | | * expect the draftCNDb to be purged accordingly. |
| | | * Put a short purge delay to the CNIndexDB, clear the changelogDB, expect the |
| | | * CNIndexDB to be purged accordingly. |
| | | */ |
| | | private void ECLPurgeDraftCNDbAfterChangelogClear() throws Exception |
| | | private void ECLPurgeCNIndexDBAfterChangelogClear() throws Exception |
| | | { |
| | | String tn = "ECLPurgeDraftCNDbAfterChangelogClear"; |
| | | String tn = "ECLPurgeCNIndexDBAfterChangelogClear"; |
| | | debugInfo(tn, "Starting test\n\n"); |
| | | |
| | | DraftCNDbHandler draftdb = |
| | | (DraftCNDbHandler) replicationServer.getChangeNumberIndexDB(); |
| | | assertEquals(draftdb.count(), 8); |
| | | draftdb.setPurgeDelay(1000); |
| | | JEChangeNumberIndexDB cnIndexDB = |
| | | (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB(); |
| | | assertEquals(cnIndexDB.count(), 8); |
| | | cnIndexDB.setPurgeDelay(1000); |
| | | |
| | | clearChangelogDB(replicationServer); |
| | | |
| | | // Expect changes purged from the changelog db to be sometimes |
| | | // also purged from the DraftCNDb. |
| | | while (!draftdb.isEmpty()) |
| | | // also purged from the CNIndexDB. |
| | | while (!cnIndexDB.isEmpty()) |
| | | { |
| | | debugInfo(tn, "draftdb.count=" + draftdb.count()); |
| | | debugInfo(tn, "cnIndexDB.count=" + cnIndexDB.count()); |
| | | sleep(200); |
| | | } |
| | | |
| File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java |
| | |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.opends.server.replication.server.changelog.je.DbHandlerTest.*; |
| | | import static org.opends.server.replication.server.changelog.je.JEReplicaDBTest.*; |
| | | import static org.testng.Assert.*; |
| | | |
| | | /** |
| | | * Test the DraftCNDbHandler class with 2 kinds of cleaning of the db : - |
| | | * Test the JEChangeNumberIndexDB class with 2 kinds of cleaning of the db : - |
| | | * periodic trim - call to clear method() |
| | | */ |
| | | @SuppressWarnings("javadoc") |
| | | public class DraftCNDbHandlerTest extends ReplicationTestCase |
| | | public class JEChangeNumberIndexDBTest extends ReplicationTestCase |
| | | { |
| | | /** |
| | | * This test makes basic operations of a DraftCNDb : - create the db - add |
| | | * records - read them with a cursor - set a very short trim period - wait for |
| | | * the db to be trimmed / here since the changes are not stored in the |
| | | * replication changelog, the draftCNDb will be cleared. |
| | | * This test makes basic operations of a JEChangeNumberIndexDB: |
| | | * <ol> |
| | | * <li>create the db</li> |
| | | * <li>add records</li> |
| | | * <li>read them with a cursor</li> |
| | | * <li>set a very short trim period</li> |
| | | * <li>wait for the db to be trimmed / here since the changes are not stored |
| | | * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li> |
| | | * </ol> |
| | | */ |
| | | @Test() |
| | | void testDraftCNDbHandlerTrim() throws Exception |
| | | void testTrim() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | DraftCNDbHandler handler = null; |
| | | JEChangeNumberIndexDB cnIndexDB = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | |
| | | 2, 0, 100, null); |
| | | replicationServer = new ReplicationServer(conf); |
| | | |
| | | handler = newDraftCNDbHandler(replicationServer); |
| | | handler.setPurgeDelay(0); |
| | | cnIndexDB = newCNIndexDB(replicationServer); |
| | | cnIndexDB.setPurgeDelay(0); |
| | | |
| | | // Prepare data to be stored in the db |
| | | int cn1 = 3; |
| | |
| | | CSN[] csns = newCSNs(1, 0, 3); |
| | | |
| | | // Add records |
| | | handler.addRecord(new CNIndexRecord(cn1, value1, baseDN1, csns[0])); |
| | | handler.addRecord(new CNIndexRecord(cn2, value2, baseDN2, csns[1])); |
| | | handler.addRecord(new CNIndexRecord(cn3, value3, baseDN3, csns[2])); |
| | | cnIndexDB.addRecord(new CNIndexRecord(cn1, value1, baseDN1, csns[0])); |
| | | cnIndexDB.addRecord(new CNIndexRecord(cn2, value2, baseDN2, csns[1])); |
| | | cnIndexDB.addRecord(new CNIndexRecord(cn3, value3, baseDN3, csns[2])); |
| | | |
| | | // The ChangeNumber should not get purged |
| | | final long firstChangeNumber = handler.getFirstRecord().getChangeNumber(); |
| | | final long firstChangeNumber = cnIndexDB.getFirstRecord().getChangeNumber(); |
| | | assertEquals(firstChangeNumber, cn1); |
| | | assertEquals(handler.getLastRecord().getChangeNumber(), cn3); |
| | | assertEquals(cnIndexDB.getLastRecord().getChangeNumber(), cn3); |
| | | |
| | | DraftCNDBCursor dbc = handler.getReadCursor(firstChangeNumber); |
| | | DraftCNDBCursor dbc = cnIndexDB.getReadCursor(firstChangeNumber); |
| | | try |
| | | { |
| | | assertEqualTo(dbc.currentRecord(), csns[0], baseDN1, value1); |
| | |
| | | StaticUtils.close(dbc); |
| | | } |
| | | |
| | | handler.setPurgeDelay(100); |
| | | cnIndexDB.setPurgeDelay(100); |
| | | |
| | | // Check the db is cleared. |
| | | while (!handler.isEmpty()) |
| | | while (!cnIndexDB.isEmpty()) |
| | | { |
| | | Thread.sleep(200); |
| | | } |
| | | assertNull(handler.getFirstRecord()); |
| | | assertNull(handler.getLastRecord()); |
| | | assertEquals(handler.count(), 0); |
| | | assertNull(cnIndexDB.getFirstRecord()); |
| | | assertNull(cnIndexDB.getLastRecord()); |
| | | assertEquals(cnIndexDB.count(), 0); |
| | | } |
| | | finally |
| | | { |
| | | if (handler != null) |
| | | handler.shutdown(); |
| | | if (cnIndexDB != null) |
| | | cnIndexDB.shutdown(); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | |
| | | assertEquals(data.getPreviousCookie(), cookie); |
| | | } |
| | | |
| | | private DraftCNDbHandler newDraftCNDbHandler(ReplicationServer rs) throws Exception |
| | | private JEChangeNumberIndexDB newCNIndexDB(ReplicationServer rs) throws Exception |
| | | { |
| | | File testRoot = createCleanDir(); |
| | | ReplicationDbEnv dbEnv = new ReplicationDbEnv(testRoot.getPath(), rs); |
| | | return new DraftCNDbHandler(rs, dbEnv); |
| | | return new JEChangeNumberIndexDB(rs, dbEnv); |
| | | } |
| | | |
| | | private File createCleanDir() throws IOException |
| | |
| | | String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); |
| | | String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot |
| | | + File.separator + "build"); |
| | | path = path + File.separator + "unit-tests" + File.separator + "DraftCNDbHandler"; |
| | | path = path + File.separator + "unit-tests" + File.separator + "JEChangeNumberIndexDB"; |
| | | final File testRoot = new File(path); |
| | | TestCaseUtils.deleteDirectory(testRoot); |
| | | testRoot.mkdirs(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * This test makes basic operations of a DraftCNDb and explicitly calls |
| | | * the clear() method instead of waiting for the periodic trim to clear |
| | | * This test makes basic operations of a JEChangeNumberIndexDB and explicitly |
| | | * calls the clear() method instead of waiting for the periodic trim to clear |
| | | * it. |
| | | * - create the db |
| | | * - add records |
| | | * - read them with a cursor |
| | | * - clear the db. |
| | | * @throws Exception |
| | | * <ol> |
| | | * <li>create the db</li> |
| | | * <li>add records</li> |
| | | * <li>read them with a cursor</li> |
| | | * <li>clear the db</li> |
| | | * </ol> |
| | | */ |
| | | @Test() |
| | | void testDraftCNDbHandlerClear() throws Exception |
| | | void testClear() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | DraftCNDbHandler handler = null; |
| | | JEChangeNumberIndexDB cnIndexDB = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | |
| | | 2, 0, 100, null); |
| | | replicationServer = new ReplicationServer(conf); |
| | | |
| | | handler = newDraftCNDbHandler(replicationServer); |
| | | handler.setPurgeDelay(0); |
| | | cnIndexDB = newCNIndexDB(replicationServer); |
| | | cnIndexDB.setPurgeDelay(0); |
| | | |
| | | assertTrue(handler.isEmpty()); |
| | | assertTrue(cnIndexDB.isEmpty()); |
| | | |
| | | // Prepare data to be stored in the db |
| | | int cn1 = 3; |
| | |
| | | CSN[] csns = newCSNs(1, 0, 3); |
| | | |
| | | // Add records |
| | | handler.addRecord(new CNIndexRecord(cn1, value1, baseDN1, csns[0])); |
| | | handler.addRecord(new CNIndexRecord(cn2, value2, baseDN2, csns[1])); |
| | | handler.addRecord(new CNIndexRecord(cn3, value3, baseDN3, csns[2])); |
| | | cnIndexDB.addRecord(new CNIndexRecord(cn1, value1, baseDN1, csns[0])); |
| | | cnIndexDB.addRecord(new CNIndexRecord(cn2, value2, baseDN2, csns[1])); |
| | | cnIndexDB.addRecord(new CNIndexRecord(cn3, value3, baseDN3, csns[2])); |
| | | Thread.sleep(500); |
| | | |
| | | // Checks |
| | | assertEquals(handler.getFirstRecord().getChangeNumber(), cn1); |
| | | assertEquals(handler.getLastRecord().getChangeNumber(), cn3); |
| | | assertEquals(cnIndexDB.getFirstRecord().getChangeNumber(), cn1); |
| | | assertEquals(cnIndexDB.getLastRecord().getChangeNumber(), cn3); |
| | | |
| | | assertEquals(handler.count(), 3, "Db count"); |
| | | assertFalse(handler.isEmpty()); |
| | | assertEquals(cnIndexDB.count(), 3, "Db count"); |
| | | assertFalse(cnIndexDB.isEmpty()); |
| | | |
| | | assertEquals(getPreviousCookie(handler, cn1), value1); |
| | | assertEquals(getPreviousCookie(handler, cn2), value2); |
| | | assertEquals(getPreviousCookie(handler, cn3), value3); |
| | | assertEquals(getPreviousCookie(cnIndexDB, cn1), value1); |
| | | assertEquals(getPreviousCookie(cnIndexDB, cn2), value2); |
| | | assertEquals(getPreviousCookie(cnIndexDB, cn3), value3); |
| | | |
| | | ChangeNumberIndexDBCursor cursor = handler.getCursorFrom(cn1); |
| | | ChangeNumberIndexDBCursor cursor = cnIndexDB.getCursorFrom(cn1); |
| | | assertCursorReadsInOrder(cursor, cn1, cn2, cn3); |
| | | |
| | | cursor = handler.getCursorFrom(cn2); |
| | | cursor = cnIndexDB.getCursorFrom(cn2); |
| | | assertCursorReadsInOrder(cursor, cn2, cn3); |
| | | |
| | | cursor = handler.getCursorFrom(cn3); |
| | | cursor = cnIndexDB.getCursorFrom(cn3); |
| | | assertCursorReadsInOrder(cursor, cn3); |
| | | |
| | | handler.clear(); |
| | | cnIndexDB.clear(); |
| | | |
| | | // Check the db is cleared. |
| | | assertNull(handler.getFirstRecord()); |
| | | assertNull(handler.getLastRecord()); |
| | | assertEquals(handler.count(), 0); |
| | | assertTrue(handler.isEmpty()); |
| | | assertNull(cnIndexDB.getFirstRecord()); |
| | | assertNull(cnIndexDB.getLastRecord()); |
| | | assertEquals(cnIndexDB.count(), 0); |
| | | assertTrue(cnIndexDB.isEmpty()); |
| | | } |
| | | finally |
| | | { |
| | | if (handler != null) |
| | | handler.shutdown(); |
| | | if (cnIndexDB != null) |
| | | cnIndexDB.shutdown(); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | | |
| | | private String getPreviousCookie(DraftCNDbHandler handler, long changeNumber) throws Exception |
| | | private String getPreviousCookie(JEChangeNumberIndexDB cnIndexDB, |
| | | long changeNumber) throws Exception |
| | | { |
| | | ChangeNumberIndexDBCursor cursor = handler.getCursorFrom(changeNumber); |
| | | ChangeNumberIndexDBCursor cursor = cnIndexDB.getCursorFrom(changeNumber); |
| | | try |
| | | { |
| | | return cursor.getRecord().getPreviousCookie(); |
| File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DbHandlerTest.java |
| | |
| | | import static org.testng.Assert.*; |
| | | |
| | | /** |
| | | * Test the dbHandler class |
| | | * Test the JEReplicaDB class |
| | | */ |
| | | @SuppressWarnings("javadoc") |
| | | public class DbHandlerTest extends ReplicationTestCase |
| | | public class JEReplicaDBTest extends ReplicationTestCase |
| | | { |
| | | /** The tracer object for the debug logger */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | |
| | | } |
| | | |
| | | @Test(enabled=true) |
| | | void testDbHandlerTrim() throws Exception |
| | | void testTrim() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100, 5000); |
| | | DbHandler handler = newDbHandler(replicationServer); |
| | | JEReplicaDB replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | CSN[] csns = newCSNs(1, 0, 5); |
| | | |
| | | handler.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid")); |
| | | handler.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid")); |
| | | handler.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid")); |
| | | DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN, csns[3], "uid"); |
| | | |
| | | //-- |
| | | // Iterator tests with memory queue only populated |
| | | |
| | | // verify that memory queue is populated |
| | | assertEquals(handler.getQueueSize(),3); |
| | | assertEquals(replicaDB.getQueueSize(), 3); |
| | | |
| | | assertFoundInOrder(handler, csns[0], csns[1], csns[2]); |
| | | assertNotFound(handler, csns[4]); |
| | | assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]); |
| | | assertNotFound(replicaDB, csns[4]); |
| | | |
| | | //-- |
| | | // Iterator tests with db only populated |
| | | Thread.sleep(1000); // let the time for flush to happen |
| | | |
| | | // verify that memory queue is empty (all changes flushed in the db) |
| | | assertEquals(handler.getQueueSize(),0); |
| | | assertEquals(replicaDB.getQueueSize(), 0); |
| | | |
| | | assertFoundInOrder(handler, csns[0], csns[1], csns[2]); |
| | | assertNotFound(handler, csns[4]); |
| | | assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]); |
| | | assertNotFound(replicaDB, csns[4]); |
| | | |
| | | assertEquals(handler.getOldestCSN(), csns[0]); |
| | | assertEquals(handler.getNewestCSN(), csns[2]); |
| | | assertEquals(replicaDB.getOldestCSN(), csns[0]); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[2]); |
| | | |
| | | //-- |
| | | // Cursor tests with db and memory queue populated |
| | | // all changes in the db - add one in the memory queue |
| | | handler.add(update4); |
| | | replicaDB.add(update4); |
| | | |
| | | // verify memory queue contains this one |
| | | assertEquals(handler.getQueueSize(),1); |
| | | assertEquals(replicaDB.getQueueSize(), 1); |
| | | |
| | | assertFoundInOrder(handler, csns[0], csns[1], csns[2], csns[3]); |
| | | assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]); |
| | | // Test cursor from existing CSN at the limit between queue and db |
| | | assertFoundInOrder(handler, csns[2], csns[3]); |
| | | assertFoundInOrder(handler, csns[3]); |
| | | assertNotFound(handler, csns[4]); |
| | | assertFoundInOrder(replicaDB, csns[2], csns[3]); |
| | | assertFoundInOrder(replicaDB, csns[3]); |
| | | assertNotFound(replicaDB, csns[4]); |
| | | |
| | | handler.setPurgeDelay(1); |
| | | replicaDB.setPurgeDelay(1); |
| | | |
| | | boolean purged = false; |
| | | int count = 300; // wait at most 60 seconds |
| | | while (!purged && (count > 0)) |
| | | { |
| | | CSN oldestCSN = handler.getOldestCSN(); |
| | | CSN newestCSN = handler.getNewestCSN(); |
| | | CSN oldestCSN = replicaDB.getOldestCSN(); |
| | | CSN newestCSN = replicaDB.getNewestCSN(); |
| | | if (!oldestCSN.equals(csns[3]) || !newestCSN.equals(csns[3])) |
| | | { |
| | | TestCaseUtils.sleep(100); |
| | |
| | | return new ReplicationServer(conf); |
| | | } |
| | | |
| | | private DbHandler newDbHandler(ReplicationServer replicationServer) throws Exception |
| | | private JEReplicaDB newReplicaDB(ReplicationServer replicationServer) throws Exception |
| | | { |
| | | JEChangelogDB changelogDB = (JEChangelogDB) replicationServer.getChangelogDB(); |
| | | return changelogDB.getOrCreateDbHandler(TEST_ROOT_DN, 1, replicationServer).getFirst(); |
| | | return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, replicationServer).getFirst(); |
| | | } |
| | | |
| | | private File createCleanDir() throws IOException |
| | |
| | | String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); |
| | | String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot |
| | | + File.separator + "build"); |
| | | path = path + File.separator + "unit-tests" + File.separator + "dbHandler"; |
| | | path = path + File.separator + "unit-tests" + File.separator + "JEReplicaDB"; |
| | | final File testRoot = new File(path); |
| | | TestCaseUtils.deleteDirectory(testRoot); |
| | | testRoot.mkdirs(); |
| | | return testRoot; |
| | | } |
| | | |
| | | private void assertFoundInOrder(DbHandler handler, CSN... csns) throws Exception |
| | | private void assertFoundInOrder(JEReplicaDB replicaDB, CSN... csns) throws Exception |
| | | { |
| | | if (csns.length == 0) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | ReplicaDBCursor cursor = handler.generateCursorFrom(csns[0]); |
| | | ReplicaDBCursor cursor = replicaDB.generateCursorFrom(csns[0]); |
| | | try |
| | | { |
| | | assertNull(cursor.getChange()); |
| | |
| | | } |
| | | } |
| | | |
| | | private void assertNotFound(DbHandler handler, CSN csn) |
| | | private void assertNotFound(JEReplicaDB replicaDB, CSN csn) |
| | | { |
| | | ReplicaDBCursor cursor = null; |
| | | try |
| | | { |
| | | cursor = handler.generateCursorFrom(csn); |
| | | cursor = replicaDB.generateCursorFrom(csn); |
| | | fail("Expected exception"); |
| | | } |
| | | catch (ChangelogException e) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Test the feature of clearing a dbHandler used by a replication server. |
| | | * The clear feature is used when a replication server receives a request |
| | | * to reset the generationId of a given domain. |
| | | * Test the feature of clearing a JEReplicaDB used by a replication server. |
| | | * The clear feature is used when a replication server receives a request to |
| | | * reset the generationId of a given domain. |
| | | */ |
| | | @Test(enabled=true) |
| | | void testDbHandlerClear() throws Exception |
| | | void testClear() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100, 5000); |
| | | DbHandler handler = newDbHandler(replicationServer); |
| | | JEReplicaDB replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | CSN[] csns = newCSNs(1, 0, 3); |
| | | |
| | | // Add the changes |
| | | handler.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid")); |
| | | handler.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid")); |
| | | handler.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid")); |
| | | |
| | | // Check they are here |
| | | assertEquals(csns[0], handler.getOldestCSN()); |
| | | assertEquals(csns[2], handler.getNewestCSN()); |
| | | assertEquals(csns[0], replicaDB.getOldestCSN()); |
| | | assertEquals(csns[2], replicaDB.getNewestCSN()); |
| | | |
| | | // Clear ... |
| | | handler.clear(); |
| | | replicaDB.clear(); |
| | | |
| | | // Check the db is cleared. |
| | | assertEquals(null, handler.getOldestCSN()); |
| | | assertEquals(null, handler.getNewestCSN()); |
| | | assertEquals(null, replicaDB.getOldestCSN()); |
| | | assertEquals(null, replicaDB.getNewestCSN()); |
| | | |
| | | } |
| | | finally |
| | |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100000, 10); |
| | | DbHandler handler = newDbHandler(replicationServer); |
| | | JEReplicaDB replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | CSN[] csns = newCSNs(1, System.currentTimeMillis(), 6); |
| | | for (int i = 0; i < 5; i++) |
| | | { |
| | | if (i != 3) |
| | | { |
| | | handler.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); |
| | | } |
| | | } |
| | | handler.flush(); |
| | | replicaDB.flush(); |
| | | |
| | | cursor = handler.generateCursorFrom(csns[0]); |
| | | cursor = replicaDB.generateCursorFrom(csns[0]); |
| | | assertTrue(cursor.next()); |
| | | assertEquals(cursor.getChange().getCSN(), csns[1]); |
| | | StaticUtils.close(cursor); |
| | | |
| | | cursor = handler.generateCursorFrom(csns[3]); |
| | | cursor = replicaDB.generateCursorFrom(csns[3]); |
| | | assertTrue(cursor.next()); |
| | | assertEquals(cursor.getChange().getCSN(), csns[4]); |
| | | StaticUtils.close(cursor); |
| | | |
| | | cursor = handler.generateCursorFrom(csns[4]); |
| | | cursor = replicaDB.generateCursorFrom(csns[4]); |
| | | assertFalse(cursor.next()); |
| | | assertNull(cursor.getChange()); |
| | | } |
| | |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100000, 10); |
| | | DbHandler handler = newDbHandler(replicationServer); |
| | | JEReplicaDB replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | CSN[] csns = newCSNs(1, System.currentTimeMillis(), 5); |
| | | for (CSN csn : csns) |
| | | { |
| | | handler.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid")); |
| | | } |
| | | handler.flush(); |
| | | replicaDB.flush(); |
| | | |
| | | assertEquals(handler.getCount(csns[0], csns[0]), 1); |
| | | assertEquals(handler.getCount(csns[0], csns[1]), 2); |
| | | assertEquals(handler.getCount(csns[0], csns[4]), 5); |
| | | assertEquals(handler.getCount(null, csns[4]), 5); |
| | | assertEquals(handler.getCount(csns[0], null), 0); |
| | | assertEquals(handler.getCount(null, null), 5); |
| | | assertEquals(replicaDB.getCount(csns[0], csns[0]), 1); |
| | | assertEquals(replicaDB.getCount(csns[0], csns[1]), 2); |
| | | assertEquals(replicaDB.getCount(csns[0], csns[4]), 5); |
| | | assertEquals(replicaDB.getCount(null, csns[4]), 5); |
| | | assertEquals(replicaDB.getCount(csns[0], null), 0); |
| | | assertEquals(replicaDB.getCount(null, null), 5); |
| | | } |
| | | finally |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Test the logic that manages counter records in the DbHandler in order to |
| | | * Test the logic that manages counter records in the JEReplicaDB in order to |
| | | * optimize the counting of record in the replication changelog db. |
| | | */ |
| | | @Test(enabled=true, groups = { "opendj-256" }) |
| | |
| | | // reproducible and always has the same value of 3004): |
| | | // |
| | | // Failed Test: |
| | | // org.opends.server.replication.server.DbHandlerTest#testDbCounts |
| | | // org.opends.server.replication.server.JEReplicaDBTest#testDbCounts |
| | | // [testng] Failure Cause: java.lang.AssertionError: AFTER PURGE |
| | | // expected:<8000> but was:<3004> |
| | | // [testng] org.testng.Assert.fail(Assert.java:84) |
| | |
| | | // [testng] org.testng.Assert.assertEquals(Assert.java:108) |
| | | // [testng] org.testng.Assert.assertEquals(Assert.java:323) |
| | | // [testng] |
| | | // org.opends.server.replication.server.DbHandlerTest.testDBCount(DbHandlerTest.java:594) |
| | | // org.opends.server.replication.server.JEReplicaDBTest.testDBCount(JEReplicaDBTest.java:594) |
| | | // [testng] |
| | | // org.opends.server.replication.server.DbHandlerTest.testDbCounts(DbHandlerTest.java:389) |
| | | // org.opends.server.replication.server.JEReplicaDBTest.testDbCounts(JEReplicaDBTest.java:389) |
| | | |
| | | // It's worth testing with 2 different setting for counterRecord |
| | | // - a counter record is put every 10 Update msg in the db - just a unit |
| | |
| | | File testRoot = null; |
| | | ReplicationServer replicationServer = null; |
| | | ReplicationDbEnv dbEnv = null; |
| | | DbHandler handler = null; |
| | | JEReplicaDB replicaDB = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | |
| | | |
| | | testRoot = createCleanDir(); |
| | | dbEnv = new ReplicationDbEnv(testRoot.getPath(), replicationServer); |
| | | handler = new DbHandler(1, TEST_ROOT_DN, replicationServer, dbEnv, 10); |
| | | handler.setCounterRecordWindowSize(counterWindow); |
| | | replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv, 10); |
| | | replicaDB.setCounterRecordWindowSize(counterWindow); |
| | | |
| | | // Populate the db with 'max' msg |
| | | int mySeqnum = 1; |
| | |
| | | for (int i=1; i<=max; i++) |
| | | { |
| | | csns[i] = new CSN(now + i, mySeqnum, 1); |
| | | handler.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); |
| | | mySeqnum+=2; |
| | | } |
| | | handler.flush(); |
| | | replicaDB.flush(); |
| | | |
| | | assertEquals(handler.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(handler.getNewestCSN(), csns[max], "Wrong newest CSN"); |
| | | assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN"); |
| | | |
| | | // Test count in different subcases trying to handle all special cases |
| | | // regarding the 'counter' record and 'count' algorithm |
| | | assertCount(tn, handler, csns[1], csns[1], 1, "FROM change1 TO change1 "); |
| | | assertCount(tn, handler, csns[1], csns[2], 2, "FROM change1 TO change2 "); |
| | | assertCount(tn, handler, csns[1], csns[counterWindow], counterWindow, |
| | | assertCount(tn, replicaDB, csns[1], csns[1], 1, "FROM change1 TO change1 "); |
| | | assertCount(tn, replicaDB, csns[1], csns[2], 2, "FROM change1 TO change2 "); |
| | | assertCount(tn, replicaDB, csns[1], csns[counterWindow], counterWindow, |
| | | "FROM change1 TO counterWindow=" + counterWindow); |
| | | |
| | | final int j = counterWindow + 1; |
| | | assertCount(tn, handler, csns[1], csns[j], j, |
| | | assertCount(tn, replicaDB, csns[1], csns[j], j, |
| | | "FROM change1 TO counterWindow+1=" + j); |
| | | final int k = 2 * counterWindow; |
| | | assertCount(tn, handler, csns[1], csns[k], k, |
| | | assertCount(tn, replicaDB, csns[1], csns[k], k, |
| | | "FROM change1 TO 2*counterWindow=" + k); |
| | | final int l = k + 1; |
| | | assertCount(tn, handler, csns[1], csns[l], l, |
| | | assertCount(tn, replicaDB, csns[1], csns[l], l, |
| | | "FROM change1 TO 2*counterWindow+1=" + l); |
| | | assertCount(tn, handler, csns[2], csns[5], 4, |
| | | assertCount(tn, replicaDB, csns[2], csns[5], 4, |
| | | "FROM change2 TO change5 "); |
| | | assertCount(tn, handler, csns[(counterWindow + 2)], csns[(counterWindow + 5)], 4, |
| | | assertCount(tn, replicaDB, csns[(counterWindow + 2)], csns[(counterWindow + 5)], 4, |
| | | "FROM counterWindow+2 TO counterWindow+5 "); |
| | | assertCount(tn, handler, csns[2], csns[(counterWindow + 5)], counterWindow + 4, |
| | | assertCount(tn, replicaDB, csns[2], csns[(counterWindow + 5)], counterWindow + 4, |
| | | "FROM change2 TO counterWindow+5 "); |
| | | assertCount(tn, handler, csns[(counterWindow + 4)], csns[(counterWindow + 4)], 1, |
| | | assertCount(tn, replicaDB, csns[(counterWindow + 4)], csns[(counterWindow + 4)], 1, |
| | | "FROM counterWindow+4 TO counterWindow+4 "); |
| | | |
| | | CSN olderThanOldest = null; |
| | |
| | | |
| | | // Now we want to test with start and stop outside of the db |
| | | |
| | | assertCount(tn, handler, csns[1], newerThanNewest, max, |
| | | assertCount(tn, replicaDB, csns[1], newerThanNewest, max, |
| | | "FROM our first generated change TO now (> newest change in the db)"); |
| | | assertCount(tn, handler, olderThanOldest, newerThanNewest, max, |
| | | assertCount(tn, replicaDB, olderThanOldest, newerThanNewest, max, |
| | | "FROM null (start of time) TO now (> newest change in the db)"); |
| | | |
| | | // Now we want to test that after closing and reopening the db, the |
| | | // counting algo is well reinitialized and when new messages are added |
| | | // the new counter are correctly generated. |
| | | debugInfo(tn,"SHUTDOWN handler and recreate"); |
| | | handler.shutdown(); |
| | | debugInfo(tn, "SHUTDOWN replicaDB and recreate"); |
| | | replicaDB.shutdown(); |
| | | |
| | | handler = new DbHandler(1, TEST_ROOT_DN, replicationServer, dbEnv, 10); |
| | | handler.setCounterRecordWindowSize(counterWindow); |
| | | replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv, 10); |
| | | replicaDB.setCounterRecordWindowSize(counterWindow); |
| | | |
| | | assertEquals(handler.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(handler.getNewestCSN(), csns[max], "Wrong newest CSN"); |
| | | assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN"); |
| | | |
| | | assertCount(tn, handler, csns[1], newerThanNewest, max, |
| | | assertCount(tn, replicaDB, csns[1], newerThanNewest, max, |
| | | "FROM our first generated change TO now (> newest change in the db)"); |
| | | |
| | | // Populate the db with 'max' msg |
| | | for (int i=max+1; i<=(2*max); i++) |
| | | { |
| | | csns[i] = new CSN(now + i, mySeqnum, 1); |
| | | handler.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); |
| | | mySeqnum+=2; |
| | | } |
| | | handler.flush(); |
| | | replicaDB.flush(); |
| | | |
| | | assertEquals(handler.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(handler.getNewestCSN(), csns[2 * max], "Wrong newest CSN"); |
| | | assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN"); |
| | | |
| | | assertCount(tn, handler, csns[1], newerThanNewest, 2 * max, |
| | | assertCount(tn, replicaDB, csns[1], newerThanNewest, 2 * max, |
| | | "FROM our first generated change TO now (> newest change in the db)"); |
| | | |
| | | // |
| | | |
| | | handler.setPurgeDelay(100); |
| | | replicaDB.setPurgeDelay(100); |
| | | sleep(4000); |
| | | long totalCount = handler.getCount(null, null); |
| | | long totalCount = replicaDB.getCount(null, null); |
| | | debugInfo(tn, "FROM our first generated change TO now (> newest change in the db)" + " After purge, total count=" + totalCount); |
| | | |
| | | String testcase = "AFTER PURGE (oldest, newest)="; |
| | | debugInfo(tn, testcase + handler.getOldestCSN() + handler.getNewestCSN()); |
| | | assertEquals(handler.getNewestCSN(), csns[2 * max], "Newest="); |
| | | debugInfo(tn, testcase + replicaDB.getOldestCSN() + replicaDB.getNewestCSN()); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Newest="); |
| | | |
| | | int expectedCnt; |
| | | if (totalCount>1) |
| | | { |
| | | final int newestSeqnum = handler.getNewestCSN().getSeqnum(); |
| | | final int oldestSeqnum = handler.getOldestCSN().getSeqnum(); |
| | | final int newestSeqnum = replicaDB.getNewestCSN().getSeqnum(); |
| | | final int oldestSeqnum = replicaDB.getOldestCSN().getSeqnum(); |
| | | expectedCnt = ((newestSeqnum - oldestSeqnum + 1)/2) + 1; |
| | | } |
| | | else |
| | | { |
| | | expectedCnt = 0; |
| | | } |
| | | assertCount(tn, handler, csns[1], newerThanNewest, expectedCnt, "AFTER PURGE"); |
| | | assertCount(tn, replicaDB, csns[1], newerThanNewest, expectedCnt, "AFTER PURGE"); |
| | | |
| | | // Clear ... |
| | | debugInfo(tn,"clear:"); |
| | | handler.clear(); |
| | | replicaDB.clear(); |
| | | |
| | | // Check the db is cleared. |
| | | assertEquals(null, handler.getOldestCSN()); |
| | | assertEquals(null, handler.getNewestCSN()); |
| | | assertEquals(null, replicaDB.getOldestCSN()); |
| | | assertEquals(null, replicaDB.getNewestCSN()); |
| | | debugInfo(tn,"Success"); |
| | | } |
| | | finally |
| | | { |
| | | if (handler != null) |
| | | handler.shutdown(); |
| | | if (replicaDB != null) |
| | | replicaDB.shutdown(); |
| | | if (dbEnv != null) |
| | | dbEnv.shutdown(); |
| | | remove(replicationServer); |
| | |
| | | } |
| | | } |
| | | |
| | | private void assertCount(String tn, DbHandler handler, CSN from, CSN to, |
| | | private void assertCount(String tn, JEReplicaDB replicaDB, CSN from, CSN to, |
| | | int expectedCount, String testcase) |
| | | { |
| | | long actualCount = handler.getCount(from, to); |
| | | long actualCount = replicaDB.getCount(from, to); |
| | | debugInfo(tn, testcase + " actualCount=" + actualCount); |
| | | assertEquals(actualCount, expectedCount, testcase); |
| | | } |