| | |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.server.changelog.api.*; |
| | | import org.opends.server.replication.server.changelog.api.CNIndexData; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.ServerConstants; |
| | | |
| | |
| | | /** |
| | | * Specifies the last changer number requested. |
| | | */ |
| | | private int lastChangeNumber = 0; |
| | | private long lastChangeNumber = 0; |
| | | /** |
| | | * Specifies whether the change number db has been read until its end. |
| | | */ |
| | |
| | | * @throws DirectoryException |
| | | * When an error is raised. |
| | | */ |
| | | private void initializeCLSearchFromChangeNumber(int startChangeNumber) |
| | | private void initializeCLSearchFromChangeNumber(long startChangeNumber) |
| | | throws DirectoryException |
| | | { |
| | | try |
| | |
| | | catch(DirectoryException de) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, de); |
| | | releaseIterator(); |
| | | releaseCursor(); |
| | | throw de; |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | releaseIterator(); |
| | | releaseCursor(); |
| | | throw new DirectoryException( |
| | | ResultCode.OPERATIONS_ERROR, |
| | | Message.raw(Category.SYNC, |
| | |
| | | * @throws DirectoryException |
| | | * if a database problem occurred |
| | | */ |
| | | private String findCookie(final int startChangeNumber) |
| | | throws ChangelogException, |
| | | DirectoryException |
| | | private String findCookie(final long startChangeNumber) |
| | | throws ChangelogException, DirectoryException |
| | | { |
| | | final ChangeNumberIndexDB cnIndexDB = |
| | | replicationServer.getChangeNumberIndexDB(); |
| | |
| | | return null; |
| | | } |
| | | |
| | | final long firstChangeNumber = cnIndexDB.getFirstChangeNumber(); |
| | | final String crossDomainStartState = |
| | | cnIndexDB.getPreviousCookie(firstChangeNumber); |
| | | final CNIndexData firstCNData = cnIndexDB.getFirstCNIndexData(); |
| | | final long firstChangeNumber = firstCNData.getChangeNumber(); |
| | | final String crossDomainStartState = firstCNData.getPreviousCookie(); |
| | | cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber); |
| | | return crossDomainStartState; |
| | | } |
| | |
| | | // Request filter DOES contain a startChangeNumber |
| | | |
| | | // Read the draftCNDb to see whether it contains startChangeNumber |
| | | String crossDomainStartState = |
| | | cnIndexDB.getPreviousCookie(startChangeNumber); |
| | | if (crossDomainStartState != null) |
| | | CNIndexData startCNData = cnIndexDB.getCNIndexData(startChangeNumber); |
| | | if (startCNData != null) |
| | | { |
| | | // found the provided startChangeNumber, let's return it |
| | | final String crossDomainStartState = startCNData.getPreviousCookie(); |
| | | cnIndexDBCursor = cnIndexDB.getCursorFrom(startChangeNumber); |
| | | return crossDomainStartState; |
| | | } |
| | |
| | | // the DB, let's use the lower limit. |
| | | if (startChangeNumber < firstChangeNumber) |
| | | { |
| | | crossDomainStartState = cnIndexDB.getPreviousCookie(firstChangeNumber); |
| | | if (crossDomainStartState != null) |
| | | CNIndexData firstCNData = cnIndexDB.getCNIndexData(firstChangeNumber); |
| | | if (firstCNData != null) |
| | | { |
| | | final String crossDomainStartState = firstCNData.getPreviousCookie(); |
| | | cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber); |
| | | return crossDomainStartState; |
| | | } |
| | |
| | | return null; |
| | | } |
| | | |
| | | final long lastKey = cnIndexDB.getLastChangeNumber(); |
| | | crossDomainStartState = cnIndexDB.getPreviousCookie(lastKey); |
| | | final CNIndexData lastCNData = cnIndexDB.getLastCNIndexData(); |
| | | final long lastKey = lastCNData.getChangeNumber(); |
| | | final String crossDomainStartState = lastCNData.getPreviousCookie(); |
| | | cnIndexDBCursor = cnIndexDB.getCursorFrom(lastKey); |
| | | return crossDomainStartState; |
| | | |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(this + " shutdown()"); |
| | | releaseIterator(); |
| | | releaseCursor(); |
| | | for (DomainContext domainCtxt : domainCtxts) { |
| | | if (!domainCtxt.unRegisterHandler()) { |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, |
| | |
| | | domainCtxts = null; |
| | | } |
| | | |
| | | private void releaseIterator() |
| | | private void releaseCursor() |
| | | { |
| | | if (this.cnIndexDBCursor != null) |
| | | { |
| | |
| | | oldestContext.currentState.update( |
| | | change.getUpdateMsg().getCSN()); |
| | | |
| | | if (oldestContext.currentState.cover(oldestContext.stopState)) |
| | | { |
| | | oldestContext.active = false; |
| | | } |
| | | if (draftCompat |
| | | && lastChangeNumber > 0 |
| | | && change.getChangeNumber() > lastChangeNumber) |
| | | if (oldestContext.currentState.cover(oldestContext.stopState) |
| | | || (draftCompat |
| | | && lastChangeNumber > 0 |
| | | && change.getChangeNumber() > lastChangeNumber)) |
| | | { |
| | | oldestContext.active = false; |
| | | } |
| | |
| | | if (searchPhase == PERSISTENT_PHASE) |
| | | { |
| | | if (debugEnabled()) |
| | | clDomCtxtsToString("In getNextECLUpdate (persistent): " + |
| | | "looking for the generalized oldest change"); |
| | | TRACER.debugInfo(clDomCtxtsToString( |
| | | "In getNextECLUpdate (persistent): " |
| | | + "looking for the generalized oldest change")); |
| | | |
| | | for (DomainContext domainCtxt : domainCtxts) { |
| | | domainCtxt.getNextEligibleMessageForDomain(operationId); |
| | |
| | | |
| | | if (draftCompat) |
| | | { |
| | | assignNewDraftCNAndStore(change); |
| | | assignNewChangeNumberAndStore(change); |
| | | } |
| | | oldestChange = change; |
| | | } |
| | |
| | | |
| | | if (oldestChange != null) |
| | | { |
| | | final CSN csn = oldestChange.getUpdateMsg().getCSN(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate updates previousCookie:" |
| | | + oldestChange.getUpdateMsg().getCSN()); |
| | | TRACER.debugInfo("getNextECLUpdate updates previousCookie:" + csn); |
| | | |
| | | // Update the current state |
| | | previousCookie.update( |
| | | oldestChange.getBaseDN(), |
| | | oldestChange.getUpdateMsg().getCSN()); |
| | | previousCookie.update(oldestChange.getBaseDN(), csn); |
| | | |
| | | // Set the current value of global state in the returned message |
| | | oldestChange.setCookie(previousCookie); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getNextECLUpdate returns result oldest change =" + |
| | | oldestChange); |
| | | TRACER.debugInfo("getNextECLUpdate returns result oldestChange=" |
| | | + oldestChange); |
| | | |
| | | } |
| | | return oldestChange; |
| | |
| | | if (isEndOfCNIndexDBReached) |
| | | { |
| | | // we are at the end of the DraftCNdb in the append mode |
| | | assignNewDraftCNAndStore(oldestChange); |
| | | assignNewChangeNumberAndStore(oldestChange); |
| | | return true; |
| | | } |
| | | |
| | | |
| | | // the next change from the CNIndexDB |
| | | CSN csnFromDraftCNDb = cnIndexDBCursor.getCSN(); |
| | | String dnFromDraftCNDb = cnIndexDBCursor.getBaseDN(); |
| | | final CNIndexData cnIndexData = cnIndexDBCursor.getCNIndexData(); |
| | | final CSN csnFromDraftCNDb = cnIndexData.getCSN(); |
| | | final String dnFromDraftCNDb = cnIndexData.getBaseDN(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber() generating change number " |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber() generating change number " |
| | | + " assigning changeNumber=" + cnIndexDBCursor.getChangeNumber() |
| | | + " assigning changeNumber=" + cnIndexData.getChangeNumber() |
| | | + " to change=" + oldestChange); |
| | | |
| | | oldestChange.setChangeNumber(cnIndexDBCursor.getChangeNumber()); |
| | | oldestChange.setChangeNumber(cnIndexData.getChangeNumber()); |
| | | return true; |
| | | } |
| | | |
| | |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber() generating change number has" |
| | | + "skipped to changeNumber=" + cnIndexDBCursor.getChangeNumber() |
| | | + " csn=" + cnIndexDBCursor.getCSN() + " End of CNIndexDB ?" |
| | | + "skipped to changeNumber=" + cnIndexData.getChangeNumber() |
| | | + " csn=" + cnIndexData.getCSN() + " End of CNIndexDB ?" |
| | | + isEndOfCNIndexDBReached); |
| | | } |
| | | catch (ChangelogException e) |
| | |
| | | return sameDN && sameCSN; |
| | | } |
| | | |
| | | private void assignNewDraftCNAndStore(ECLUpdateMsg change) |
| | | private void assignNewChangeNumberAndStore(ECLUpdateMsg change) |
| | | throws DirectoryException, ChangelogException |
| | | { |
| | | // generate a new change number and assign to this change |
| | |
| | | |
| | | // store in CNIndexDB the pair |
| | | // (change number of the current change, state before this change) |
| | | replicationServer.getChangeNumberIndexDB().add( |
| | | replicationServer.getChangeNumberIndexDB().add(new CNIndexData( |
| | | change.getChangeNumber(), |
| | | previousCookie.toString(), |
| | | change.getBaseDN(), |
| | | change.getUpdateMsg().getCSN()); |
| | | change.getUpdateMsg().getCSN())); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | // End of INIT_PHASE => always release the iterator |
| | | releaseIterator(); |
| | | releaseCursor(); |
| | | } |
| | | |
| | | /** |
| | |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.server.changelog.api.CNIndexData; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.je.DbHandler; |
| | |
| | | * (this diff is done domain by domain) |
| | | */ |
| | | |
| | | long lastChangeNumber; |
| | | boolean dbEmpty = false; |
| | | final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB(); |
| | | |
| | | try |
| | | { |
| | | long firstChangeNumber = cnIndexDB.getFirstChangeNumber(); |
| | | boolean dbEmpty = true; |
| | | long firstChangeNumber = 0; |
| | | long lastChangeNumber = 0; |
| | | |
| | | final CNIndexData firstCNData = cnIndexDB.getFirstCNIndexData(); |
| | | final CNIndexData lastCNData = cnIndexDB.getLastCNIndexData(); |
| | | |
| | | Map<String, ServerState> domainsServerStateForLastCN = null; |
| | | CSN csnForLastCN = null; |
| | | String domainForLastCN = null; |
| | | if (firstChangeNumber < 1) |
| | | if (firstCNData != null) |
| | | { |
| | | dbEmpty = true; |
| | | firstChangeNumber = 0; |
| | | lastChangeNumber = 0; |
| | | } |
| | | else |
| | | { |
| | | lastChangeNumber = cnIndexDB.getLastChangeNumber(); |
| | | dbEmpty = false; |
| | | firstChangeNumber = firstCNData.getChangeNumber(); |
| | | lastChangeNumber = lastCNData.getChangeNumber(); |
| | | |
| | | // Get the generalized state associated with the current last change |
| | | // number and initializes from it the startStates table |
| | | String lastCNGenState = cnIndexDB.getPreviousCookie(lastChangeNumber); |
| | | String lastCNGenState = lastCNData.getPreviousCookie(); |
| | | if (lastCNGenState != null && lastCNGenState.length() > 0) |
| | | { |
| | | domainsServerStateForLastCN = MultiDomainServerState |
| | | .splitGenStateToServerStates(lastCNGenState); |
| | | } |
| | | |
| | | csnForLastCN = cnIndexDB.getCSN(lastChangeNumber); |
| | | domainForLastCN = cnIndexDB.getBaseDN(lastChangeNumber); |
| | | csnForLastCN = lastCNData.getCSN(); |
| | | domainForLastCN = lastCNData.getBaseDN(); |
| | | } |
| | | |
| | | long newestDate = 0; |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2013 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.api; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | |
| | | /** |
| | | * The Change Number Index Data class represents records stored in the |
| | | * {@link ChangeNumberIndexDB}. |
| | | */ |
| | | public class CNIndexData |
| | | { |
| | | |
| | | /** This is the key used to store the rest of the . */ |
| | | private long changeNumber; |
| | | private String previousCookie; |
| | | private String baseDN; |
| | | private CSN csn; |
| | | |
| | | /** |
| | | * Builds an instance of this class. |
| | | * |
| | | * @param changeNumber |
| | | * the change number |
| | | * @param previousCookie |
| | | * the previous cookie |
| | | * @param baseDN |
| | | * the baseDN |
| | | * @param csn |
| | | * the replication CSN field |
| | | */ |
| | | public CNIndexData(long changeNumber, String previousCookie, String baseDN, |
| | | CSN csn) |
| | | { |
| | | super(); |
| | | this.changeNumber = changeNumber; |
| | | this.previousCookie = previousCookie; |
| | | this.baseDN = baseDN; |
| | | this.csn = csn; |
| | | } |
| | | |
| | | /** |
| | | * Getter for the baseDN field. |
| | | * |
| | | * @return the baseDN |
| | | */ |
| | | public String getBaseDN() |
| | | { |
| | | return baseDN; |
| | | } |
| | | |
| | | /** |
| | | * Getter for the replication CSN field. |
| | | * |
| | | * @return The replication CSN field. |
| | | */ |
| | | public CSN getCSN() |
| | | { |
| | | return csn; |
| | | } |
| | | |
| | | /** |
| | | * Getter for the change number field. |
| | | * |
| | | * @return The change number field. |
| | | */ |
| | | public long getChangeNumber() |
| | | { |
| | | return changeNumber; |
| | | } |
| | | |
| | | /** |
| | | * Get the previous cookie field. |
| | | * |
| | | * @return the previous cookie. |
| | | */ |
| | | public String getPreviousCookie() |
| | | { |
| | | return previousCookie; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "changeNumber=" + changeNumber + " csn=" + csn + " baseDN=" + baseDN |
| | | + " previousCookie=" + previousCookie; |
| | | } |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.api; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | |
| | | /** |
| | | * This class stores an index of all the changes seen by this server. The index |
| | | * is sorted by a global ordering as defined in the CSN class. The index links a |
| | | * <code>changeNumber</code> to the corresponding {@link CSN}. The {@link CSN} |
| | | * then links to a corresponding change in one of the {@link ReplicaDB}s. |
| | | * This class stores an index of all the changes seen by this server in the form |
| | | * of {@link CNIndexData}. The index is sorted by a global ordering as defined |
| | | * in the CSN class. The index links a <code>changeNumber</code> to the |
| | | * corresponding CSN. The CSN then links to a corresponding change in one of the |
| | | * ReplicaDBs. |
| | | * |
| | | * @see <a href= |
| | | * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names" |
| | |
| | | { |
| | | |
| | | /** |
| | | * Get the CSN associated to a provided change number. |
| | | * Get the {@link CNIndexData} record associated to a provided change number. |
| | | * |
| | | * @param changeNumber |
| | | * the provided change number. |
| | | * @return the associated CSN, null when none. |
| | | * @return the {@link CNIndexData} record, null when none. |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | public CSN getCSN(long changeNumber) throws ChangelogException; |
| | | CNIndexData getCNIndexData(long changeNumber) throws ChangelogException; |
| | | |
| | | /** |
| | | * Get the baseDN associated to a provided change number. |
| | | * Get the first {@link CNIndexData} record stored in this DB. |
| | | * |
| | | * @param changeNumber |
| | | * the provided change number. |
| | | * @return the baseDN, null when none. |
| | | * @return Returns the first {@link CNIndexData} record in this DB. |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | public String getBaseDN(long changeNumber) throws ChangelogException; |
| | | CNIndexData getFirstCNIndexData() throws ChangelogException; |
| | | |
| | | /** |
| | | * Get the previous cookie associated to a provided change number. |
| | | * Get the last {@link CNIndexData} record stored in this DB. |
| | | * |
| | | * @param changeNumber |
| | | * the provided change number. |
| | | * @return the previous cookie, null when none. |
| | | * @return Returns the last {@link CNIndexData} record in this DB |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | String getPreviousCookie(long changeNumber) throws ChangelogException; |
| | | |
| | | /** |
| | | * Get the first change number stored in this DB. |
| | | * |
| | | * @return Returns the first change number in this DB. |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | long getFirstChangeNumber() throws ChangelogException; |
| | | CNIndexData getLastCNIndexData() throws ChangelogException; |
| | | |
| | | /** |
| | | * Get the last change number stored in this DB. |
| | |
| | | * This method is blocking if the size of the list of message is larger than |
| | | * its maximum. |
| | | * |
| | | * @param changeNumber |
| | | * The change number for this record in this DB. |
| | | * @param previousCookie |
| | | * The value of the previous cookie. |
| | | * @param baseDN |
| | | * The associated baseDN. |
| | | * @param csn |
| | | * The associated replication CSN. |
| | | * @param cnIndexData |
| | | * The {@link CNIndexData} record to add to this DB. |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | void add(long changeNumber, String previousCookie, String baseDN, CSN csn) |
| | | throws ChangelogException; |
| | | void add(CNIndexData cnIndexData) throws ChangelogException; |
| | | |
| | | /** |
| | | * Generate a new {@link ChangeNumberIndexDBCursor} that allows to browse the |
| | |
| | | |
| | | import java.io.Closeable; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | |
| | | /** |
| | | * Iterator into the changelog database. Once it is not used anymore, a |
| | | * ChangelogDBIterator must be closed to release all the resources into the |
| | |
| | | { |
| | | |
| | | /** |
| | | * Getter for the replication CSN field. |
| | | * Getter for the {@link CNIndexData} record. |
| | | * |
| | | * @return The replication CSN field. |
| | | * @return The replication CNIndexData record. |
| | | */ |
| | | CSN getCSN(); |
| | | |
| | | /** |
| | | * Getter for the baseDN field. |
| | | * |
| | | * @return The service ID. |
| | | */ |
| | | String getBaseDN(); |
| | | |
| | | /** |
| | | * Getter for the change number field. |
| | | * |
| | | * @return The change number field. |
| | | */ |
| | | long getChangeNumber(); |
| | | CNIndexData getCNIndexData(); |
| | | |
| | | /** |
| | | * Skip to the next record of the database. |
| | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.server.changelog.api.CNIndexData; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | |
| | |
| | | public class DraftCNDB |
| | | { |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | private static final int DATABASE_EMPTY = 0; |
| | | |
| | | private Database db; |
| | | private ReplicationDbEnv dbenv; |
| | |
| | | |
| | | /** |
| | | * Add an entry to the database. |
| | | * @param changeNumber the provided change number. |
| | | * |
| | | * @param value the provided value to be stored associated |
| | | * with this change number. |
| | | * @param domainBaseDN the provided domainBaseDn to be stored associated |
| | | * with this change number. |
| | | * @param csn the provided replication CSN to be |
| | | * stored associated with this change number. |
| | | * @param cnIndexData |
| | | * the provided {@link CNIndexData} to be stored. |
| | | */ |
| | | public void addEntry(long changeNumber, String value, String domainBaseDN, |
| | | CSN csn) |
| | | public void addEntry(CNIndexData cnIndexData) |
| | | { |
| | | try |
| | | { |
| | | final long changeNumber = cnIndexData.getChangeNumber(); |
| | | DatabaseEntry key = new ReplicationDraftCNKey(changeNumber); |
| | | DatabaseEntry data = new DraftCNData(value, domainBaseDN, csn); |
| | | DatabaseEntry data = |
| | | new DraftCNData(changeNumber, cnIndexData.getPreviousCookie(), |
| | | cnIndexData.getBaseDN(), cnIndexData.getCSN()); |
| | | |
| | | // Use a transaction so that we can override durability. |
| | | Transaction txn = null; |
| | |
| | | |
| | | /** |
| | | * Read the first Change from the database, 0 when none. |
| | | * |
| | | * @return the first change number. |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | public int readFirstChangeNumber() |
| | | public CNIndexData readFirstCNIndexData() throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | |
| | | // If the DB has been closed then return immediately. |
| | | if (isDBClosed()) |
| | | { |
| | | return DATABASE_EMPTY; |
| | | return null; |
| | | } |
| | | |
| | | cursor = db.openCursor(null, null); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | ReplicationDraftCNKey key = new ReplicationDraftCNKey(); |
| | | DatabaseEntry entry = new DatabaseEntry(); |
| | | if (cursor.getFirst(key, entry, LockMode.DEFAULT) != SUCCESS) |
| | | { |
| | | return DATABASE_EMPTY; |
| | | return null; |
| | | } |
| | | |
| | | return Integer.parseInt(decodeUTF8(key.getData())); |
| | | return newCNIndexData(key, entry); |
| | | } |
| | | finally |
| | | { |
| | |
| | | catch (DatabaseException e) |
| | | { |
| | | dbenv.shutdownOnException(e); |
| | | return DATABASE_EMPTY; |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | private CNIndexData newCNIndexData(ReplicationDraftCNKey key, |
| | | DatabaseEntry data) throws ChangelogException |
| | | { |
| | | return new DraftCNData(key.getChangeNumber(), data.getData()) |
| | | .getCNIndexData(); |
| | | } |
| | | |
| | | /** |
| | | * Return the record count. |
| | | * @return the record count. |
| | |
| | | // If the DB has been closed then return immediately. |
| | | if (isDBClosed()) |
| | | { |
| | | return DATABASE_EMPTY; |
| | | return 0; |
| | | } |
| | | |
| | | return db.count(); |
| | |
| | | { |
| | | dbCloseLock.readLock().unlock(); |
| | | } |
| | | return DATABASE_EMPTY; |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * Read the last change number from the database. |
| | | * |
| | | * @return the last change number. |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | public int readLastChangeNumber() |
| | | public CNIndexData readLastCNIndexData() throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | |
| | | // If the DB has been closed then return immediately. |
| | | if (isDBClosed()) |
| | | { |
| | | return DATABASE_EMPTY; |
| | | return null; |
| | | } |
| | | |
| | | cursor = db.openCursor(null, null); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | ReplicationDraftCNKey key = new ReplicationDraftCNKey(); |
| | | DatabaseEntry entry = new DatabaseEntry(); |
| | | if (cursor.getLast(key, entry, LockMode.DEFAULT) != SUCCESS) |
| | | { |
| | | return DATABASE_EMPTY; |
| | | return null; |
| | | } |
| | | |
| | | return Integer.parseInt(decodeUTF8(key.getData())); |
| | | return newCNIndexData(key, entry); |
| | | } |
| | | finally |
| | | { |
| | |
| | | catch (DatabaseException e) |
| | | { |
| | | dbenv.shutdownOnException(e); |
| | | return DATABASE_EMPTY; |
| | | return null; |
| | | } |
| | | } |
| | | |
| | |
| | | * Will be set non null for a write cursor. |
| | | */ |
| | | private final Transaction txn; |
| | | private final DatabaseEntry key; |
| | | private final DatabaseEntry entry; |
| | | private DraftCNData cnData; |
| | | private final ReplicationDraftCNKey key; |
| | | private final DatabaseEntry entry = new DatabaseEntry(); |
| | | private CNIndexData cnIndexData; |
| | | private boolean isClosed = false; |
| | | |
| | | |
| | |
| | | private DraftCNDBCursor(long startChangeNumber) throws ChangelogException |
| | | { |
| | | this.key = new ReplicationDraftCNKey(startChangeNumber); |
| | | this.entry = new DatabaseEntry(); |
| | | |
| | | // Take the lock. From now on, whatever error that happen in the life |
| | | // of this cursor should end by unlocking that lock. We must also |
| | |
| | | } |
| | | else |
| | | { |
| | | cnData = new DraftCNData(entry.getData()); |
| | | cnIndexData = newCNIndexData(this.key, entry); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | cnData = new DraftCNData(entry.getData()); |
| | | cnIndexData = newCNIndexData(this.key, entry); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | private DraftCNDBCursor() throws ChangelogException |
| | | { |
| | | Transaction localTxn = null; |
| | | Cursor localCursor = null; |
| | | |
| | | this.key = new DatabaseEntry(); |
| | | this.entry = new DatabaseEntry(); |
| | | this.key = new ReplicationDraftCNKey(); |
| | | |
| | | // We'll go on only if no close or no clear is running |
| | | dbCloseLock.readLock().lock(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Getter for the value field of the current cursor. |
| | | * @return The current value field. |
| | | */ |
| | | public String currentValue() |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | try |
| | | { |
| | | if (cnData != null) |
| | | { |
| | | return cnData.getValue(); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * Getter for the baseDN field of the current cursor. |
| | | * @return The current baseDN. |
| | | */ |
| | | public String currentBaseDN() |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | try |
| | | { |
| | | if (cnData != null) |
| | | { |
| | | return cnData.getBaseDN(); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * Getter for the integer value of the current cursor, representing |
| | | * the current change number being processed. |
| | | * Returns the {@link CNIndexData} at the current position of the cursor. |
| | | * |
| | | * @return the current change number as an integer. |
| | | * @return The current {@link CNIndexData}. |
| | | */ |
| | | public int currentKey() |
| | | public CNIndexData currentData() |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return -1; |
| | | return null; |
| | | } |
| | | |
| | | try |
| | | { |
| | | String str = decodeUTF8(key.getData()); |
| | | return Integer.parseInt(str); |
| | | return cnIndexData; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | return -1; |
| | | } |
| | | |
| | | /** |
| | | * Returns the replication CSN associated with the current key. |
| | | * @return the replication CSN |
| | | */ |
| | | public CSN currentCSN() |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | try |
| | | { |
| | | if (cnData != null) |
| | | { |
| | | return cnData.getCSN(); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Go to the next record on the cursor. |
| | | * @return the next record on this cursor. |
| | |
| | | OperationStatus status = cursor.getNext(key, entry, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | cnData = null; |
| | | cnIndexData = null; |
| | | return false; |
| | | } |
| | | cnData = new DraftCNData(entry.getData()); |
| | | cnIndexData = newCNIndexData(this.key, entry); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the current key associated with this cursor. |
| | | * |
| | | * @return The current key associated with this cursor. |
| | | */ |
| | | public DatabaseEntry getKey() |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | return key; |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.server.changelog.api.CNIndexData; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | |
| | | import com.sleepycat.je.DatabaseEntry; |
| | |
| | | |
| | | private static final long serialVersionUID = 1L; |
| | | |
| | | private String value; |
| | | private String baseDN; |
| | | private CSN csn; |
| | | private long changeNumber; |
| | | private CNIndexData cnIndexData; |
| | | |
| | | /** |
| | | * Creates a record to be stored in the DraftCNDB. |
| | | * @param previousCookie The previous cookie. |
| | | * @param baseDN The baseDN (domain DN). |
| | | * @param csn The replication CSN. |
| | | * |
| | | * @param changeNumber |
| | | * the change number |
| | | * @param previousCookie |
| | | * The previous cookie |
| | | * @param baseDN |
| | | * The baseDN (domain DN) |
| | | * @param csn |
| | | * The replication CSN |
| | | */ |
| | | public DraftCNData(String previousCookie, String baseDN, CSN csn) |
| | | public DraftCNData(long changeNumber, String previousCookie, String baseDN, |
| | | CSN csn) |
| | | { |
| | | this.changeNumber = changeNumber; |
| | | String record = |
| | | previousCookie + FIELD_SEPARATOR + baseDN + FIELD_SEPARATOR + csn; |
| | | setData(getBytes(record)); |
| | |
| | | |
| | | /** |
| | | * Creates a record to be stored in the DraftCNDB from the provided byte[]. |
| | | * @param data the provided byte[]. |
| | | * @throws ChangelogException a. |
| | | * |
| | | * @param changeNumber |
| | | * the change number |
| | | * @param data |
| | | * the provided byte[] |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | public DraftCNData(byte[] data) throws ChangelogException |
| | | public DraftCNData(long changeNumber, byte[] data) throws ChangelogException |
| | | { |
| | | decodeData(data); |
| | | this.changeNumber = changeNumber; |
| | | this.cnIndexData = decodeData(changeNumber, data); |
| | | } |
| | | |
| | | /** |
| | | * Decode a record into fields. |
| | | * @param data the provided byte array. |
| | | * @throws ChangelogException when a problem occurs. |
| | | * Decode and returns a {@link CNIndexData} record. |
| | | * |
| | | * @param changeNumber |
| | | * @param data |
| | | * the provided byte array. |
| | | * @return the decoded {@link CNIndexData} record |
| | | * @throws ChangelogException |
| | | * when a problem occurs. |
| | | */ |
| | | public void decodeData(byte[] data) throws ChangelogException |
| | | private CNIndexData decodeData(long changeNumber, byte[] data) |
| | | throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | String stringData = new String(data, "UTF-8"); |
| | | |
| | | String[] str = stringData.split(FIELD_SEPARATOR, 3); |
| | | value = str[0]; |
| | | baseDN = str[1]; |
| | | csn = new CSN(str[2]); |
| | | return new CNIndexData(changeNumber, str[0], str[1], new CSN(str[2])); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Getter for the value. |
| | | * Getter for the decoded {@link CNIndexData} record. |
| | | * |
| | | * @return the value. |
| | | * @throws ChangelogException when a problem occurs. |
| | | */ |
| | | public String getValue() throws ChangelogException |
| | | { |
| | | if (value == null) |
| | | decodeData(getData()); |
| | | return this.value; |
| | | } |
| | | |
| | | /** |
| | | * Getter for the service ID. |
| | | * |
| | | * @return The baseDN |
| | | * @throws ChangelogException when a problem occurs. |
| | | */ |
| | | public String getBaseDN() throws ChangelogException |
| | | { |
| | | if (value == null) |
| | | decodeData(getData()); |
| | | return this.baseDN; |
| | | } |
| | | |
| | | /** |
| | | * Getter for the replication CSN. |
| | | * |
| | | * @return the replication CSN. |
| | | * @return the CNIndexData record. |
| | | * @throws ChangelogException |
| | | * when a problem occurs. |
| | | */ |
| | | public CSN getCSN() throws ChangelogException |
| | | public CNIndexData getCNIndexData() throws ChangelogException |
| | | { |
| | | if (value == null) |
| | | decodeData(getData()); |
| | | return this.csn; |
| | | if (cnIndexData == null) |
| | | cnIndexData = decodeData(changeNumber, getData()); |
| | | return cnIndexData; |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | StringBuilder buffer = new StringBuilder(); |
| | | toString(buffer); |
| | | return buffer.toString(); |
| | | return "DraftCNData : [" + cnIndexData + "]"; |
| | | } |
| | | |
| | | /** |
| | | * Dump a string representation of these data into the provided buffer. |
| | | * @param buffer the provided buffer. |
| | | */ |
| | | public void toString(StringBuilder buffer) |
| | | { |
| | | buffer.append("DraftCNData : [value=").append(value); |
| | | buffer.append("] [serviceID=").append(baseDN); |
| | | buffer.append("] [csn=").append(csn).append("]"); |
| | | } |
| | | } |
| | |
| | | import org.opends.server.replication.server.changelog.je.DraftCNDB.*; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.types.InitializationException; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | |
| | | * FIXME Is this field that useful? {@link #getFirstChangeNumber()} does not |
| | | * even use it! |
| | | */ |
| | | private int firstChangeNumber = NO_KEY; |
| | | private long firstChangeNumber = NO_KEY; |
| | | /** |
| | | * FIXME Is this field that useful? {@link #getLastChangeNumber()} does not |
| | | * even use it! |
| | | */ |
| | | private int lastChangeNumber = NO_KEY; |
| | | private long lastChangeNumber = NO_KEY; |
| | | private DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | | private boolean shutdown = false; |
| | | private boolean trimDone = false; |
| | |
| | | |
| | | // DB initialization |
| | | db = new DraftCNDB(dbenv); |
| | | firstChangeNumber = db.readFirstChangeNumber(); |
| | | lastChangeNumber = db.readLastChangeNumber(); |
| | | firstChangeNumber = getChangeNumber(db.readFirstCNIndexData()); |
| | | lastChangeNumber = getChangeNumber(db.readLastCNIndexData()); |
| | | |
| | | // Trimming thread |
| | | thread = new DirectoryThread(this, "Replication DraftCN db"); |
| | |
| | | DirectoryServer.registerMonitorProvider(dbMonitor); |
| | | } |
| | | |
| | | private long getChangeNumber(CNIndexData cnIndexData) |
| | | throws ChangelogException |
| | | { |
| | | if (cnIndexData != null) |
| | | { |
| | | return cnIndexData.getChangeNumber(); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public synchronized void add(long changeNumber, String previousCookie, |
| | | String baseDN, CSN csn) |
| | | public void add(CNIndexData cnIndexData) throws ChangelogException |
| | | { |
| | | db.addEntry(changeNumber, previousCookie, baseDN, csn); |
| | | db.addEntry(cnIndexData); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In DraftCNDbhandler.add, added: " |
| | | + " key=" + changeNumber |
| | | + " previousCookie=" + previousCookie |
| | | + " baseDN=" + baseDN |
| | | + " csn=" + csn); |
| | | TRACER.debugInfo("In DraftCNDbhandler.add, added: " + cnIndexData); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getFirstChangeNumber() |
| | | public CNIndexData getFirstCNIndexData() throws ChangelogException |
| | | { |
| | | return db.readFirstChangeNumber(); |
| | | return db.readFirstCNIndexData(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getLastChangeNumber() |
| | | public CNIndexData getLastCNIndexData() throws ChangelogException |
| | | { |
| | | return db.readLastChangeNumber(); |
| | | return db.readLastCNIndexData(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getLastChangeNumber() throws ChangelogException |
| | | { |
| | | final CNIndexData data = getLastCNIndexData(); |
| | | if (data != null) |
| | | { |
| | | return data.getChangeNumber(); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | // From the draftCNDb change record, get the domain and CSN |
| | | final CSN csn = cursor.currentCSN(); |
| | | final String baseDN = cursor.currentBaseDN(); |
| | | final CNIndexData data = cursor.currentData(); |
| | | final String baseDN = data.getBaseDN(); |
| | | if (baseDNToClear != null && baseDNToClear.equalsIgnoreCase(baseDN)) |
| | | { |
| | | cursor.delete(); |
| | |
| | | continue; |
| | | } |
| | | |
| | | final CSN csn = data.getCSN(); |
| | | final ServerState startState = domain.getStartState(); |
| | | final CSN fcsn = startState.getCSN(csn.getServerId()); |
| | | |
| | | final int currentChangeNumber = cursor.currentKey(); |
| | | final long currentChangeNumber = data.getChangeNumber(); |
| | | |
| | | if (csn.older(fcsn)) |
| | | { |
| | |
| | | { |
| | | Map<String, ServerState> csnStartStates = |
| | | MultiDomainServerState.splitGenStateToServerStates( |
| | | cursor.currentValue()); |
| | | data.getPreviousCookie()); |
| | | csnVector = csnStartStates.get(baseDN); |
| | | |
| | | if (debugEnabled()) |
| | |
| | | public List<Attribute> getMonitorData() |
| | | { |
| | | List<Attribute> attributes = new ArrayList<Attribute>(); |
| | | attributes.add(Attributes.create("first-draft-changenumber", |
| | | Integer.toString(db.readFirstChangeNumber()))); |
| | | attributes.add(Attributes.create("last-draft-changenumber", |
| | | Integer.toString(db.readLastChangeNumber()))); |
| | | attributes.add(Attributes.create("count", |
| | | Long.toString(count()))); |
| | | |
| | | try |
| | | { |
| | | CNIndexData firstCNData = db.readFirstCNIndexData(); |
| | | String firstCN = String.valueOf(firstCNData.getChangeNumber()); |
| | | attributes.add(Attributes.create("first-draft-changenumber", firstCN)); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugCaught(DebugLogLevel.WARNING, e); |
| | | attributes.add(Attributes.create("first-draft-changenumber", "0")); |
| | | } |
| | | |
| | | try |
| | | { |
| | | CNIndexData lastCNData = db.readLastCNIndexData(); |
| | | if (lastCNData != null) |
| | | { |
| | | String lastCN = String.valueOf(lastCNData.getChangeNumber()); |
| | | attributes.add(Attributes.create("last-draft-changenumber", lastCN)); |
| | | } |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugCaught(DebugLogLevel.WARNING, e); |
| | | attributes.add(Attributes.create("last-draft-changenumber", "0")); |
| | | } |
| | | |
| | | attributes.add(Attributes.create("count", Long.toString(count()))); |
| | | return attributes; |
| | | } |
| | | |
| | |
| | | public void clear() throws ChangelogException |
| | | { |
| | | db.clear(); |
| | | firstChangeNumber = db.readFirstChangeNumber(); |
| | | lastChangeNumber = db.readLastChangeNumber(); |
| | | firstChangeNumber = getChangeNumber(db.readFirstCNIndexData()); |
| | | lastChangeNumber = getChangeNumber(db.readLastCNIndexData()); |
| | | } |
| | | |
| | | private ReentrantLock lock = new ReentrantLock(); |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String getPreviousCookie(long changeNumber) throws ChangelogException |
| | | public CNIndexData getCNIndexData(long changeNumber) |
| | | throws ChangelogException |
| | | { |
| | | DraftCNDBCursor cursor = null; |
| | | try |
| | | { |
| | | cursor = db.openReadCursor(changeNumber); |
| | | return cursor.currentValue(); |
| | | return cursor.currentData(); |
| | | } |
| | | finally |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public CSN getCSN(long changeNumber) throws ChangelogException |
| | | { |
| | | DraftCNDBCursor cursor = null; |
| | | try |
| | | { |
| | | cursor = db.openReadCursor(changeNumber); |
| | | return cursor.currentCSN(); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String getBaseDN(long changeNumber) throws ChangelogException |
| | | { |
| | | DraftCNDBCursor cursor = null; |
| | | try |
| | | { |
| | | cursor = db.openReadCursor(changeNumber); |
| | | return cursor.currentBaseDN(); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.server.changelog.api.*; |
| | | import org.opends.server.replication.server.changelog.je.DraftCNDB.*; |
| | | import org.opends.server.types.DebugLogLevel; |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String getBaseDN() |
| | | public CNIndexData getCNIndexData() |
| | | { |
| | | try |
| | | { |
| | | return this.draftCNDbCursor.currentBaseDN(); |
| | | return this.draftCNDbCursor.currentData(); |
| | | } |
| | | catch(Exception e) |
| | | catch (Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | return null; |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public CSN getCSN() |
| | | { |
| | | try |
| | | { |
| | | return this.draftCNDbCursor.currentCSN(); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getChangeNumber() |
| | | { |
| | | return ((ReplicationDraftCNKey) draftCNDbCursor.getKey()).getChangeNumber(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | if (draftCNDbCursor != null) |
| | |
| | | |
| | | import com.sleepycat.je.DatabaseEntry; |
| | | |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | | * Useful to create ReplicationServer keys from sequence numbers. |
| | | */ |
| | |
| | | private static final long serialVersionUID = 1L; |
| | | |
| | | /** |
| | | * Creates a ReplicationDraftCNKey that can start anywhere in the DB. |
| | | */ |
| | | public ReplicationDraftCNKey() |
| | | { |
| | | super(); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new ReplicationKey from the given change number. |
| | | * @param changeNumber The change number to use. |
| | | * |
| | | * @param changeNumber |
| | | * The change number to use. |
| | | */ |
| | | public ReplicationDraftCNKey(long changeNumber) |
| | | { |
| | |
| | | */ |
| | | public long getChangeNumber() |
| | | { |
| | | return Long.valueOf(new String(getData())); |
| | | return Long.valueOf(decodeUTF8(getData())); |
| | | } |
| | | } |
| | |
| | | import org.opends.server.replication.common.CSNGenerator; |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.CNIndexData; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.je.DraftCNDB.DraftCNDBCursor; |
| | |
| | | import static org.testng.Assert.*; |
| | | |
| | | /** |
| | | * Test the DraftCNDbHandler class with 2 kinds of cleaning of the db : |
| | | * - periodic trim |
| | | * - call to clear method() |
| | | * Test the DraftCNDbHandler class with 2 kinds of cleaning of the db : - |
| | | * periodic trim - call to clear method() |
| | | */ |
| | | @SuppressWarnings("javadoc") |
| | | public class DraftCNDbHandlerTest extends ReplicationTestCase |
| | | { |
| | | /** |
| | | * This test makes basic operations of a DraftCNDb : |
| | | * - create the db |
| | | * - add records |
| | | * - read them with a cursor |
| | | * - set a very short trim period |
| | | * - wait for the db to be trimmed / here since the changes are not stored in |
| | | * the replication changelog, the draftCNDb will be cleared. |
| | | * This test makes basic operations of a DraftCNDb : - create the db - add |
| | | * records - read them with a cursor - set a very short trim period - wait for |
| | | * the db to be trimmed / here since the changes are not stored in the |
| | | * replication changelog, the draftCNDb will be cleared. |
| | | */ |
| | | @Test() |
| | | void testDraftCNDbHandlerTrim() throws Exception |
| | |
| | | String baseDN2 = "baseDN2"; |
| | | String baseDN3 = "baseDN3"; |
| | | |
| | | CSNGenerator gen = new CSNGenerator( 1, 0); |
| | | CSNGenerator gen = new CSNGenerator(1, 0); |
| | | CSN csn1 = gen.newCSN(); |
| | | CSN csn2 = gen.newCSN(); |
| | | CSN csn3 = gen.newCSN(); |
| | | |
| | | // Add records |
| | | handler.add(cn1, value1, baseDN1, csn1); |
| | | handler.add(cn2, value2, baseDN2, csn2); |
| | | handler.add(cn3, value3, baseDN3, csn3); |
| | | handler.add(new CNIndexData(cn1, value1, baseDN1, csn1)); |
| | | handler.add(new CNIndexData(cn2, value2, baseDN2, csn2)); |
| | | handler.add(new CNIndexData(cn3, value3, baseDN3, csn3)); |
| | | |
| | | // The ChangeNumber should not get purged |
| | | final long firstChangeNumber = handler.getFirstChangeNumber(); |
| | | final long firstChangeNumber = getFirstChangeNumber(handler); |
| | | assertEquals(firstChangeNumber, cn1); |
| | | assertEquals(handler.getLastChangeNumber(), cn3); |
| | | assertEquals(getLastChangeNumber(handler), cn3); |
| | | |
| | | DraftCNDBCursor dbc = handler.getReadCursor(firstChangeNumber); |
| | | try |
| | | { |
| | | assertEquals(dbc.currentCSN(), csn1); |
| | | assertEquals(dbc.currentBaseDN(), baseDN1); |
| | | assertEquals(dbc.currentValue(), value1); |
| | | assertEqualTo(dbc.currentData(), csn1, baseDN1, value1); |
| | | assertTrue(dbc.toString().length() != 0); |
| | | |
| | | assertTrue(dbc.next()); |
| | | |
| | | assertEquals(dbc.currentCSN(), csn2); |
| | | assertEquals(dbc.currentBaseDN(), baseDN2); |
| | | assertEquals(dbc.currentValue(), value2); |
| | | assertEqualTo(dbc.currentData(), csn2, baseDN2, value2); |
| | | |
| | | assertTrue(dbc.next()); |
| | | |
| | | assertEquals(dbc.currentCSN(), csn3); |
| | | assertEquals(dbc.currentBaseDN(), baseDN3); |
| | | assertEquals(dbc.currentValue(), value3); |
| | | assertEqualTo(dbc.currentData(), csn3, baseDN3, value3); |
| | | |
| | | assertFalse(dbc.next()); |
| | | } |
| | |
| | | handler.setPurgeDelay(100); |
| | | |
| | | // Check the db is cleared. |
| | | while(handler.count()!=0) |
| | | while (handler.count() != 0) |
| | | { |
| | | Thread.sleep(200); |
| | | } |
| | | assertEquals(handler.getFirstChangeNumber(), 0); |
| | | assertEquals(handler.getLastChangeNumber(), 0); |
| | | assertEquals(getFirstChangeNumber(handler), 0); |
| | | assertEquals(getLastChangeNumber(handler), 0); |
| | | } |
| | | finally |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private void assertEqualTo(CNIndexData data, CSN csn, String baseDN, |
| | | String cookie) |
| | | { |
| | | assertEquals(data.getCSN(), csn); |
| | | assertEquals(data.getBaseDN(), baseDN); |
| | | assertEquals(data.getPreviousCookie(), cookie); |
| | | } |
| | | |
| | | private File createCleanDir() throws IOException |
| | | { |
| | | String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); |
| | |
| | | String baseDN2 = "baseDN2"; |
| | | String baseDN3 = "baseDN3"; |
| | | |
| | | CSNGenerator gen = new CSNGenerator( 1, 0); |
| | | CSNGenerator gen = new CSNGenerator(1, 0); |
| | | CSN csn1 = gen.newCSN(); |
| | | CSN csn2 = gen.newCSN(); |
| | | CSN csn3 = gen.newCSN(); |
| | | |
| | | // Add records |
| | | handler.add(cn1, value1, baseDN1, csn1); |
| | | handler.add(cn2, value2, baseDN2, csn2); |
| | | handler.add(cn3, value3, baseDN3, csn3); |
| | | handler.add(new CNIndexData(cn1, value1, baseDN1, csn1)); |
| | | handler.add(new CNIndexData(cn2, value2, baseDN2, csn2)); |
| | | handler.add(new CNIndexData(cn3, value3, baseDN3, csn3)); |
| | | Thread.sleep(500); |
| | | |
| | | // Checks |
| | | assertEquals(handler.getFirstChangeNumber(), cn1); |
| | | assertEquals(handler.getLastChangeNumber(), cn3); |
| | | assertEquals(getFirstChangeNumber(handler), cn1); |
| | | assertEquals(getLastChangeNumber(handler), cn3); |
| | | |
| | | assertEquals(handler.count(), 3, "Db count"); |
| | | |
| | | assertEquals(handler.getPreviousCookie(cn1), value1); |
| | | assertEquals(handler.getPreviousCookie(cn2), value2); |
| | | assertEquals(handler.getPreviousCookie(cn3), value3); |
| | | assertEquals(getPreviousCookie(handler, cn1), value1); |
| | | assertEquals(getPreviousCookie(handler, cn2), value2); |
| | | assertEquals(getPreviousCookie(handler, cn3), value3); |
| | | |
| | | ChangeNumberIndexDBCursor cursor = handler.getCursorFrom(cn1); |
| | | assertCursorReadsInOrder(cursor, cn1, cn2, cn3); |
| | |
| | | handler.clear(); |
| | | |
| | | // Check the db is cleared. |
| | | assertEquals(handler.getFirstChangeNumber(), 0); |
| | | assertEquals(handler.getLastChangeNumber(), 0); |
| | | assertEquals(getFirstChangeNumber(handler), 0); |
| | | assertEquals(getLastChangeNumber(handler), 0); |
| | | assertEquals(handler.count(), 0); |
| | | } |
| | | finally |
| | |
| | | } |
| | | } |
| | | |
| | | private long getFirstChangeNumber(ChangeNumberIndexDB handler) throws Exception |
| | | { |
| | | return handler.getFirstCNIndexData().getChangeNumber(); |
| | | } |
| | | |
| | | private long getLastChangeNumber(ChangeNumberIndexDB handler) throws Exception |
| | | { |
| | | return handler.getLastCNIndexData().getChangeNumber(); |
| | | } |
| | | |
| | | private String getPreviousCookie(DraftCNDbHandler handler, long changeNumber) throws Exception |
| | | { |
| | | ChangeNumberIndexDBCursor cursor = handler.getCursorFrom(changeNumber); |
| | | try |
| | | { |
| | | return cursor.getCNIndexData().getPreviousCookie(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor); |
| | | } |
| | | } |
| | | |
| | | private void assertCursorReadsInOrder(ChangeNumberIndexDBCursor cursor, |
| | | int... sns) throws ChangelogException |
| | | { |
| | |
| | | { |
| | | for (int i = 0; i < sns.length; i++) |
| | | { |
| | | assertEquals(cursor.getChangeNumber(), sns[i]); |
| | | assertEquals(cursor.getCNIndexData().getChangeNumber(), sns[i]); |
| | | final boolean isNotLast = i + 1 < sns.length; |
| | | assertEquals(cursor.next(), isNotLast); |
| | | } |