OPENDJ-1116 Introduce abstraction for the changelog DB
Renamed CNIndexData to CNIndexRecord.
ChangeNumberIndexDB.java:
Renamed getCNIndexData(changeNumber), getFirstCNIndexData(), getLastCNIndexData() to getRecord(changeNumber), getFirstRecord(), getLastRecord().
ChangeNumberIndexDBCursor.java:
Renamed getCNIndexData() to getRecord().
*.java:
Consequence of these changes.
1 files renamed
9 files modified
| | |
| | | { |
| | | // Request filter DOES NOT contain any first change number |
| | | // So we'll generate from the first change number in the DraftCNdb |
| | | final CNIndexData firstCNData = cnIndexDB.getFirstCNIndexData(); |
| | | if (firstCNData == null) |
| | | final CNIndexRecord firstCNRecord = cnIndexDB.getFirstRecord(); |
| | | if (firstCNRecord == null) |
| | | { // DB is empty or closed |
| | | isEndOfCNIndexDBReached = true; |
| | | return null; |
| | | } |
| | | |
| | | final long firstChangeNumber = firstCNData.getChangeNumber(); |
| | | final String crossDomainStartState = firstCNData.getPreviousCookie(); |
| | | final long firstChangeNumber = firstCNRecord.getChangeNumber(); |
| | | final String crossDomainStartState = firstCNRecord.getPreviousCookie(); |
| | | cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber); |
| | | return crossDomainStartState; |
| | | } |
| | |
| | | // Request filter DOES contain a startChangeNumber |
| | | |
| | | // Read the draftCNDb to see whether it contains startChangeNumber |
| | | CNIndexData startCNData = cnIndexDB.getCNIndexData(startChangeNumber); |
| | | if (startCNData != null) |
| | | CNIndexRecord startCNRecord = cnIndexDB.getRecord(startChangeNumber); |
| | | if (startCNRecord != null) |
| | | { |
| | | // found the provided startChangeNumber, let's return it |
| | | final String crossDomainStartState = startCNData.getPreviousCookie(); |
| | | final String crossDomainStartState = startCNRecord.getPreviousCookie(); |
| | | cnIndexDBCursor = cnIndexDB.getCursorFrom(startChangeNumber); |
| | | return crossDomainStartState; |
| | | } |
| | |
| | | // the DB, let's use the lower limit. |
| | | if (startChangeNumber < firstChangeNumber) |
| | | { |
| | | CNIndexData firstCNData = cnIndexDB.getCNIndexData(firstChangeNumber); |
| | | if (firstCNData != null) |
| | | CNIndexRecord firstCNRecord = cnIndexDB.getRecord(firstChangeNumber); |
| | | if (firstCNRecord != null) |
| | | { |
| | | final String crossDomainStartState = firstCNData.getPreviousCookie(); |
| | | final String crossDomainStartState = firstCNRecord.getPreviousCookie(); |
| | | cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber); |
| | | return crossDomainStartState; |
| | | } |
| | |
| | | { |
| | | // startChangeNumber is between first and potential last and has never |
| | | // been returned yet |
| | | final CNIndexData lastCNData = cnIndexDB.getLastCNIndexData(); |
| | | if (lastCNData == null) |
| | | final CNIndexRecord lastCNRecord = cnIndexDB.getLastRecord(); |
| | | if (lastCNRecord == null) |
| | | { |
| | | isEndOfCNIndexDBReached = true; |
| | | return null; |
| | | } |
| | | |
| | | final long lastKey = lastCNData.getChangeNumber(); |
| | | final String crossDomainStartState = lastCNData.getPreviousCookie(); |
| | | final long lastKey = lastCNRecord.getChangeNumber(); |
| | | final String crossDomainStartState = lastCNRecord.getPreviousCookie(); |
| | | cnIndexDBCursor = cnIndexDB.getCursorFrom(lastKey); |
| | | return crossDomainStartState; |
| | | |
| | |
| | | |
| | | |
| | | // the next change from the CNIndexDB |
| | | final CNIndexData cnIndexData = cnIndexDBCursor.getCNIndexData(); |
| | | final CSN csnFromDraftCNDb = cnIndexData.getCSN(); |
| | | final String dnFromDraftCNDb = cnIndexData.getBaseDN(); |
| | | final CNIndexRecord currentRecord = cnIndexDBCursor.getRecord(); |
| | | final CSN csnFromDraftCNDb = currentRecord.getCSN(); |
| | | final String dnFromDraftCNDb = currentRecord.getBaseDN(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber() generating change number " |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber() generating change number " |
| | | + " assigning changeNumber=" + cnIndexData.getChangeNumber() |
| | | + " assigning changeNumber=" + currentRecord.getChangeNumber() |
| | | + " to change=" + oldestChange); |
| | | |
| | | oldestChange.setChangeNumber(cnIndexData.getChangeNumber()); |
| | | oldestChange.setChangeNumber(currentRecord.getChangeNumber()); |
| | | return true; |
| | | } |
| | | |
| | |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber() generating change number has" |
| | | + "skipped to changeNumber=" + cnIndexData.getChangeNumber() |
| | | + " csn=" + cnIndexData.getCSN() + " End of CNIndexDB ?" |
| | | + "skipped to changeNumber=" + currentRecord.getChangeNumber() |
| | | + " csn=" + currentRecord.getCSN() + " End of CNIndexDB ?" |
| | | + isEndOfCNIndexDBReached); |
| | | } |
| | | catch (ChangelogException e) |
| | |
| | | |
| | | // store in CNIndexDB the pair |
| | | // (change number of the current change, state before this change) |
| | | replicationServer.getChangeNumberIndexDB().add(new CNIndexData( |
| | | replicationServer.getChangeNumberIndexDB().addRecord(new CNIndexRecord( |
| | | change.getChangeNumber(), |
| | | previousCookie.toString(), |
| | | change.getBaseDN(), |
| | |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.server.changelog.api.CNIndexData; |
| | | import org.opends.server.replication.server.changelog.api.CNIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.je.DbHandler; |
| | |
| | | if (cnIndexDB == null) |
| | | { |
| | | cnIndexDB = new DraftCNDbHandler(this, this.dbEnv); |
| | | final CNIndexData lastCNData = cnIndexDB.getLastCNIndexData(); |
| | | final CNIndexRecord lastCNRecord = cnIndexDB.getLastRecord(); |
| | | // initialization of the lastGeneratedChangeNumebr from the DB content |
| | | // if DB is empty => lastCNData is null => Default to 0 |
| | | // if DB is empty => last record does not exist => default to 0 |
| | | lastGeneratedChangeNumber = |
| | | (lastCNData != null) ? lastCNData.getChangeNumber() : 0; |
| | | (lastCNRecord != null) ? lastCNRecord.getChangeNumber() : 0; |
| | | } |
| | | return cnIndexDB; |
| | | } |
| | |
| | | long firstChangeNumber = 0; |
| | | long lastChangeNumber = 0; |
| | | |
| | | final CNIndexData firstCNData = cnIndexDB.getFirstCNIndexData(); |
| | | final CNIndexData lastCNData = cnIndexDB.getLastCNIndexData(); |
| | | final CNIndexRecord firstCNRecord = cnIndexDB.getFirstRecord(); |
| | | final CNIndexRecord lastCNRecord = cnIndexDB.getLastRecord(); |
| | | |
| | | Map<String, ServerState> domainsServerStateForLastCN = null; |
| | | CSN csnForLastCN = null; |
| | | String domainForLastCN = null; |
| | | if (firstCNData != null) |
| | | if (firstCNRecord != null) |
| | | { |
| | | if (lastCNData == null) |
| | | if (lastCNRecord == null) |
| | | { |
| | | // Edge case: DB was cleaned or closed in between call to getFirst*() |
| | | // and getLast*(). The only remaining solution is to fail fast. |
| | |
| | | } |
| | | |
| | | dbEmpty = false; |
| | | firstChangeNumber = firstCNData.getChangeNumber(); |
| | | lastChangeNumber = lastCNData.getChangeNumber(); |
| | | firstChangeNumber = firstCNRecord.getChangeNumber(); |
| | | lastChangeNumber = lastCNRecord.getChangeNumber(); |
| | | |
| | | // Get the generalized state associated with the current last change |
| | | // number and initializes from it the startStates table |
| | | String lastCNGenState = lastCNData.getPreviousCookie(); |
| | | String lastCNGenState = lastCNRecord.getPreviousCookie(); |
| | | if (lastCNGenState != null && lastCNGenState.length() > 0) |
| | | { |
| | | domainsServerStateForLastCN = MultiDomainServerState |
| | | .splitGenStateToServerStates(lastCNGenState); |
| | | } |
| | | |
| | | csnForLastCN = lastCNData.getCSN(); |
| | | domainForLastCN = lastCNData.getBaseDN(); |
| | | csnForLastCN = lastCNRecord.getCSN(); |
| | | domainForLastCN = lastCNRecord.getBaseDN(); |
| | | } |
| | | |
| | | long newestDate = 0; |
| File was renamed from opends/src/server/org/opends/server/replication/server/changelog/api/CNIndexData.java |
| | |
| | | * The Change Number Index Data class represents records stored in the |
| | | * {@link ChangeNumberIndexDB}. |
| | | */ |
| | | public class CNIndexData |
| | | public class CNIndexRecord |
| | | { |
| | | |
| | | /** This is the key used to store the rest of the . */ |
| | |
| | | * @param csn |
| | | * the replication CSN field |
| | | */ |
| | | public CNIndexData(long changeNumber, String previousCookie, String baseDN, |
| | | public CNIndexRecord(long changeNumber, String previousCookie, String baseDN, |
| | | CSN csn) |
| | | { |
| | | super(); |
| | |
| | | |
| | | /** |
| | | * This class stores an index of all the changes seen by this server in the form |
| | | * of {@link CNIndexData}. The index is sorted by a global ordering as defined |
| | | * in the CSN class. The index links a <code>changeNumber</code> to the |
| | | * of {@link CNIndexRecord}s. The records are sorted by a global ordering as |
| | | * defined in the CSN class. The index links a <code>changeNumber</code> to the |
| | | * corresponding CSN. The CSN then links to a corresponding change in one of the |
| | | * ReplicaDBs. |
| | | * |
| | |
| | | { |
| | | |
| | | /** |
| | | * Get the {@link CNIndexData} record associated to a provided change number. |
| | | * Get the record associated to a provided change number. |
| | | * |
| | | * @param changeNumber |
| | | * the provided change number. |
| | | * @return the {@link CNIndexData} record, null when none. |
| | | * @return the {@link CNIndexRecord}, null when none. |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | CNIndexData getCNIndexData(long changeNumber) throws ChangelogException; |
| | | CNIndexRecord getRecord(long changeNumber) throws ChangelogException; |
| | | |
| | | /** |
| | | * Get the first {@link CNIndexData} record stored in this DB. |
| | | * Get the first record stored in this DB. |
| | | * |
| | | * @return Returns the first {@link CNIndexData} record in this DB. |
| | | * @return Returns the first {@link CNIndexRecord} in this DB, null when the |
| | | * DB is empty or closed |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | CNIndexData getFirstCNIndexData() throws ChangelogException; |
| | | CNIndexRecord getFirstRecord() throws ChangelogException; |
| | | |
| | | /** |
| | | * Get the last {@link CNIndexData} record stored in this DB. |
| | | * Get the last record stored in this DB. |
| | | * |
| | | * @return Returns the last {@link CNIndexData} record in this DB |
| | | * @return Returns the last {@link CNIndexRecord} in this DB, null when the DB |
| | | * is empty or closed |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | CNIndexData getLastCNIndexData() throws ChangelogException; |
| | | CNIndexRecord getLastRecord() throws ChangelogException; |
| | | |
| | | /** |
| | | * Add an update to the list of messages that must be saved to this DB managed |
| | |
| | | * This method is blocking if the size of the list of message is larger than |
| | | * its maximum. |
| | | * |
| | | * @param cnIndexData |
| | | * The {@link CNIndexData} record to add to this DB. |
| | | * @param record |
| | | * The {@link CNIndexRecord} to add to this DB. |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | void add(CNIndexData cnIndexData) throws ChangelogException; |
| | | void addRecord(CNIndexRecord record) throws ChangelogException; |
| | | |
| | | /** |
| | | * Generate a new {@link ChangeNumberIndexDBCursor} that allows to browse the |
| | |
| | | { |
| | | |
| | | /** |
| | | * Getter for the {@link CNIndexData} record. |
| | | * Getter for the record. |
| | | * |
| | | * @return The replication CNIndexData record. |
| | | * @return The current {@link CNIndexRecord}. |
| | | */ |
| | | CNIndexData getCNIndexData(); |
| | | CNIndexRecord getRecord(); |
| | | |
| | | /** |
| | | * Skip to the next record of the database. |
| | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.server.changelog.api.CNIndexData; |
| | | import org.opends.server.replication.server.changelog.api.CNIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Add an entry to the database. |
| | | * Add a record to the database. |
| | | * |
| | | * @param cnIndexData |
| | | * the provided {@link CNIndexData} to be stored. |
| | | * @param record |
| | | * the provided {@link CNIndexRecord} to be stored. |
| | | */ |
| | | public void addEntry(CNIndexData cnIndexData) |
| | | public void addRecord(CNIndexRecord record) |
| | | { |
| | | try |
| | | { |
| | | final long changeNumber = cnIndexData.getChangeNumber(); |
| | | final long changeNumber = record.getChangeNumber(); |
| | | DatabaseEntry key = new ReplicationDraftCNKey(changeNumber); |
| | | DatabaseEntry data = |
| | | new DraftCNData(changeNumber, cnIndexData.getPreviousCookie(), |
| | | cnIndexData.getBaseDN(), cnIndexData.getCSN()); |
| | | DatabaseEntry data = new DraftCNData(changeNumber, |
| | | record.getPreviousCookie(), record.getBaseDN(), record.getCSN()); |
| | | |
| | | // Use a transaction so that we can override durability. |
| | | Transaction txn = null; |
| | |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | public CNIndexData readFirstCNIndexData() throws ChangelogException |
| | | public CNIndexRecord readFirstRecord() throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | |
| | | return null; |
| | | } |
| | | |
| | | return newCNIndexData(key, entry); |
| | | return newCNIndexRecord(key, entry); |
| | | } |
| | | finally |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private CNIndexData newCNIndexData(ReplicationDraftCNKey key, |
| | | private CNIndexRecord newCNIndexRecord(ReplicationDraftCNKey key, |
| | | DatabaseEntry data) throws ChangelogException |
| | | { |
| | | return new DraftCNData(key.getChangeNumber(), data.getData()) |
| | | .getCNIndexData(); |
| | | return new DraftCNData(key.getChangeNumber(), data.getData()).getRecord(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | public CNIndexData readLastCNIndexData() throws ChangelogException |
| | | public CNIndexRecord readLastRecord() throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | |
| | | return null; |
| | | } |
| | | |
| | | return newCNIndexData(key, entry); |
| | | return newCNIndexRecord(key, entry); |
| | | } |
| | | finally |
| | | { |
| | |
| | | private final Transaction txn; |
| | | private final ReplicationDraftCNKey key; |
| | | private final DatabaseEntry entry = new DatabaseEntry(); |
| | | private CNIndexData cnIndexData; |
| | | private CNIndexRecord record; |
| | | private boolean isClosed = false; |
| | | |
| | | |
| | |
| | | } |
| | | else |
| | | { |
| | | cnIndexData = newCNIndexData(this.key, entry); |
| | | record = newCNIndexRecord(this.key, entry); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | cnIndexData = newCNIndexData(this.key, entry); |
| | | record = newCNIndexRecord(this.key, entry); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns the {@link CNIndexData} at the current position of the cursor. |
| | | * Returns the {@link CNIndexRecord} at the current position of the cursor. |
| | | * |
| | | * @return The current {@link CNIndexData}. |
| | | * @return The current {@link CNIndexRecord}. |
| | | */ |
| | | public CNIndexData currentData() |
| | | public CNIndexRecord currentRecord() |
| | | { |
| | | if (isClosed) |
| | | { |
| | |
| | | |
| | | try |
| | | { |
| | | return cnIndexData; |
| | | return record; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | OperationStatus status = cursor.getNext(key, entry, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | cnIndexData = null; |
| | | record = null; |
| | | return false; |
| | | } |
| | | cnIndexData = newCNIndexData(this.key, entry); |
| | | record = newCNIndexRecord(this.key, entry); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.server.changelog.api.CNIndexData; |
| | | import org.opends.server.replication.server.changelog.api.CNIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | |
| | | import com.sleepycat.je.DatabaseEntry; |
| | |
| | | private static final long serialVersionUID = 1L; |
| | | |
| | | private long changeNumber; |
| | | private CNIndexData cnIndexData; |
| | | private CNIndexRecord record; |
| | | |
| | | /** |
| | | * Creates a record to be stored in the DraftCNDB. |
| | |
| | | public DraftCNData(long changeNumber, byte[] data) throws ChangelogException |
| | | { |
| | | this.changeNumber = changeNumber; |
| | | this.cnIndexData = decodeData(changeNumber, data); |
| | | this.record = decodeData(changeNumber, data); |
| | | } |
| | | |
| | | /** |
| | | * Decode and returns a {@link CNIndexData} record. |
| | | * Decode and returns a {@link CNIndexRecord}. |
| | | * |
| | | * @param changeNumber |
| | | * @param data |
| | | * the provided byte array. |
| | | * @return the decoded {@link CNIndexData} record |
| | | * @return the decoded {@link CNIndexRecord} |
| | | * @throws ChangelogException |
| | | * when a problem occurs. |
| | | */ |
| | | private CNIndexData decodeData(long changeNumber, byte[] data) |
| | | private CNIndexRecord decodeData(long changeNumber, byte[] data) |
| | | throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | String stringData = new String(data, "UTF-8"); |
| | | String[] str = stringData.split(FIELD_SEPARATOR, 3); |
| | | return new CNIndexData(changeNumber, str[0], str[1], new CSN(str[2])); |
| | | return new CNIndexRecord(changeNumber, str[0], str[1], new CSN(str[2])); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Getter for the decoded {@link CNIndexData} record. |
| | | * Getter for the decoded record. |
| | | * |
| | | * @return the CNIndexData record. |
| | | * @return the {@link CNIndexRecord} record. |
| | | * @throws ChangelogException |
| | | * when a problem occurs. |
| | | */ |
| | | public CNIndexData getCNIndexData() throws ChangelogException |
| | | public CNIndexRecord getRecord() throws ChangelogException |
| | | { |
| | | if (cnIndexData == null) |
| | | cnIndexData = decodeData(changeNumber, getData()); |
| | | return cnIndexData; |
| | | if (record == null) |
| | | record = decodeData(changeNumber, getData()); |
| | | return record; |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "DraftCNData : [" + cnIndexData + "]"; |
| | | return "DraftCNData : [" + record + "]"; |
| | | } |
| | | |
| | | } |
| | |
| | | |
| | | // DB initialization |
| | | db = new DraftCNDB(dbenv); |
| | | firstChangeNumber = getChangeNumber(db.readFirstCNIndexData()); |
| | | lastChangeNumber = getChangeNumber(db.readLastCNIndexData()); |
| | | firstChangeNumber = getChangeNumber(db.readFirstRecord()); |
| | | lastChangeNumber = getChangeNumber(db.readLastRecord()); |
| | | |
| | | // Trimming thread |
| | | thread = new DirectoryThread(this, "Replication DraftCN db"); |
| | |
| | | DirectoryServer.registerMonitorProvider(dbMonitor); |
| | | } |
| | | |
| | | private long getChangeNumber(CNIndexData cnIndexData) |
| | | throws ChangelogException |
| | | private long getChangeNumber(CNIndexRecord record) throws ChangelogException |
| | | { |
| | | if (cnIndexData != null) |
| | | if (record != null) |
| | | { |
| | | return cnIndexData.getChangeNumber(); |
| | | return record.getChangeNumber(); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void add(CNIndexData cnIndexData) throws ChangelogException |
| | | public void addRecord(CNIndexRecord record) throws ChangelogException |
| | | { |
| | | db.addEntry(cnIndexData); |
| | | db.addRecord(record); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In DraftCNDbhandler.add, added: " + cnIndexData); |
| | | TRACER.debugInfo("In DraftCNDbhandler.add, added: " + record); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public CNIndexData getFirstCNIndexData() throws ChangelogException |
| | | public CNIndexRecord getFirstRecord() throws ChangelogException |
| | | { |
| | | return db.readFirstCNIndexData(); |
| | | return db.readFirstRecord(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public CNIndexData getLastCNIndexData() throws ChangelogException |
| | | public CNIndexRecord getLastRecord() throws ChangelogException |
| | | { |
| | | return db.readLastCNIndexData(); |
| | | return db.readLastRecord(); |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public boolean isEmpty() throws ChangelogException |
| | | { |
| | | return getLastCNIndexData() == null; |
| | | return getLastRecord() == null; |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | // From the draftCNDb change record, get the domain and CSN |
| | | final CNIndexData data = cursor.currentData(); |
| | | final String baseDN = data.getBaseDN(); |
| | | final CNIndexRecord record = cursor.currentRecord(); |
| | | final String baseDN = record.getBaseDN(); |
| | | if (baseDNToClear != null && baseDNToClear.equalsIgnoreCase(baseDN)) |
| | | { |
| | | cursor.delete(); |
| | |
| | | continue; |
| | | } |
| | | |
| | | final CSN csn = data.getCSN(); |
| | | final CSN csn = record.getCSN(); |
| | | final ServerState startState = domain.getStartState(); |
| | | final CSN fcsn = startState.getCSN(csn.getServerId()); |
| | | |
| | | final long currentChangeNumber = data.getChangeNumber(); |
| | | final long currentChangeNumber = record.getChangeNumber(); |
| | | |
| | | if (csn.older(fcsn)) |
| | | { |
| | |
| | | { |
| | | Map<String, ServerState> csnStartStates = |
| | | MultiDomainServerState.splitGenStateToServerStates( |
| | | data.getPreviousCookie()); |
| | | record.getPreviousCookie()); |
| | | csnVector = csnStartStates.get(baseDN); |
| | | |
| | | if (debugEnabled()) |
| | |
| | | |
| | | try |
| | | { |
| | | CNIndexData firstCNData = db.readFirstCNIndexData(); |
| | | String firstCN = String.valueOf(firstCNData.getChangeNumber()); |
| | | CNIndexRecord firstCNRecord = db.readFirstRecord(); |
| | | String firstCN = String.valueOf(firstCNRecord.getChangeNumber()); |
| | | attributes.add(Attributes.create("first-draft-changenumber", firstCN)); |
| | | } |
| | | catch (ChangelogException e) |
| | |
| | | |
| | | try |
| | | { |
| | | CNIndexData lastCNData = db.readLastCNIndexData(); |
| | | if (lastCNData != null) |
| | | CNIndexRecord lastCNRecord = db.readLastRecord(); |
| | | if (lastCNRecord != null) |
| | | { |
| | | String lastCN = String.valueOf(lastCNData.getChangeNumber()); |
| | | String lastCN = String.valueOf(lastCNRecord.getChangeNumber()); |
| | | attributes.add(Attributes.create("last-draft-changenumber", lastCN)); |
| | | } |
| | | } |
| | |
| | | public void clear() throws ChangelogException |
| | | { |
| | | db.clear(); |
| | | firstChangeNumber = getChangeNumber(db.readFirstCNIndexData()); |
| | | lastChangeNumber = getChangeNumber(db.readLastCNIndexData()); |
| | | firstChangeNumber = getChangeNumber(db.readFirstRecord()); |
| | | lastChangeNumber = getChangeNumber(db.readLastRecord()); |
| | | } |
| | | |
| | | private ReentrantLock lock = new ReentrantLock(); |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public CNIndexData getCNIndexData(long changeNumber) |
| | | public CNIndexRecord getRecord(long changeNumber) |
| | | throws ChangelogException |
| | | { |
| | | DraftCNDBCursor cursor = null; |
| | | try |
| | | { |
| | | cursor = db.openReadCursor(changeNumber); |
| | | return cursor.currentData(); |
| | | return cursor.currentRecord(); |
| | | } |
| | | finally |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public CNIndexData getCNIndexData() |
| | | public CNIndexRecord getRecord() |
| | | { |
| | | try |
| | | { |
| | | return this.draftCNDbCursor.currentData(); |
| | | return this.draftCNDbCursor.currentRecord(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | import org.opends.server.replication.common.CSNGenerator; |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.CNIndexData; |
| | | import org.opends.server.replication.server.changelog.api.CNIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.je.DraftCNDB.DraftCNDBCursor; |
| | |
| | | CSN csn3 = gen.newCSN(); |
| | | |
| | | // Add records |
| | | handler.add(new CNIndexData(cn1, value1, baseDN1, csn1)); |
| | | handler.add(new CNIndexData(cn2, value2, baseDN2, csn2)); |
| | | handler.add(new CNIndexData(cn3, value3, baseDN3, csn3)); |
| | | handler.addRecord(new CNIndexRecord(cn1, value1, baseDN1, csn1)); |
| | | handler.addRecord(new CNIndexRecord(cn2, value2, baseDN2, csn2)); |
| | | handler.addRecord(new CNIndexRecord(cn3, value3, baseDN3, csn3)); |
| | | |
| | | // The ChangeNumber should not get purged |
| | | final long firstChangeNumber = handler.getFirstCNIndexData().getChangeNumber(); |
| | | final long firstChangeNumber = handler.getFirstRecord().getChangeNumber(); |
| | | assertEquals(firstChangeNumber, cn1); |
| | | assertEquals(handler.getLastCNIndexData().getChangeNumber(), cn3); |
| | | assertEquals(handler.getLastRecord().getChangeNumber(), cn3); |
| | | |
| | | DraftCNDBCursor dbc = handler.getReadCursor(firstChangeNumber); |
| | | try |
| | | { |
| | | assertEqualTo(dbc.currentData(), csn1, baseDN1, value1); |
| | | assertEqualTo(dbc.currentRecord(), csn1, baseDN1, value1); |
| | | assertTrue(dbc.toString().length() != 0); |
| | | |
| | | assertTrue(dbc.next()); |
| | | assertEqualTo(dbc.currentData(), csn2, baseDN2, value2); |
| | | assertEqualTo(dbc.currentRecord(), csn2, baseDN2, value2); |
| | | |
| | | assertTrue(dbc.next()); |
| | | assertEqualTo(dbc.currentData(), csn3, baseDN3, value3); |
| | | assertEqualTo(dbc.currentRecord(), csn3, baseDN3, value3); |
| | | |
| | | assertFalse(dbc.next()); |
| | | } |
| | |
| | | { |
| | | Thread.sleep(200); |
| | | } |
| | | assertNull(handler.getFirstCNIndexData()); |
| | | assertNull(handler.getLastCNIndexData()); |
| | | assertNull(handler.getFirstRecord()); |
| | | assertNull(handler.getLastRecord()); |
| | | assertEquals(handler.count(), 0); |
| | | } |
| | | finally |
| | |
| | | } |
| | | } |
| | | |
| | | private void assertEqualTo(CNIndexData data, CSN csn, String baseDN, |
| | | private void assertEqualTo(CNIndexRecord data, CSN csn, String baseDN, |
| | | String cookie) |
| | | { |
| | | assertEquals(data.getCSN(), csn); |
| | |
| | | CSN csn3 = gen.newCSN(); |
| | | |
| | | // Add records |
| | | handler.add(new CNIndexData(cn1, value1, baseDN1, csn1)); |
| | | handler.add(new CNIndexData(cn2, value2, baseDN2, csn2)); |
| | | handler.add(new CNIndexData(cn3, value3, baseDN3, csn3)); |
| | | handler.addRecord(new CNIndexRecord(cn1, value1, baseDN1, csn1)); |
| | | handler.addRecord(new CNIndexRecord(cn2, value2, baseDN2, csn2)); |
| | | handler.addRecord(new CNIndexRecord(cn3, value3, baseDN3, csn3)); |
| | | Thread.sleep(500); |
| | | |
| | | // Checks |
| | | assertEquals(handler.getFirstCNIndexData().getChangeNumber(), cn1); |
| | | assertEquals(handler.getLastCNIndexData().getChangeNumber(), cn3); |
| | | assertEquals(handler.getFirstRecord().getChangeNumber(), cn1); |
| | | assertEquals(handler.getLastRecord().getChangeNumber(), cn3); |
| | | |
| | | assertEquals(handler.count(), 3, "Db count"); |
| | | assertFalse(handler.isEmpty()); |
| | |
| | | handler.clear(); |
| | | |
| | | // Check the db is cleared. |
| | | assertNull(handler.getFirstCNIndexData()); |
| | | assertNull(handler.getLastCNIndexData()); |
| | | assertNull(handler.getFirstRecord()); |
| | | assertNull(handler.getLastRecord()); |
| | | assertEquals(handler.count(), 0); |
| | | assertTrue(handler.isEmpty()); |
| | | } |
| | |
| | | ChangeNumberIndexDBCursor cursor = handler.getCursorFrom(changeNumber); |
| | | try |
| | | { |
| | | return cursor.getCNIndexData().getPreviousCookie(); |
| | | return cursor.getRecord().getPreviousCookie(); |
| | | } |
| | | finally |
| | | { |
| | |
| | | { |
| | | for (int i = 0; i < sns.length; i++) |
| | | { |
| | | assertEquals(cursor.getCNIndexData().getChangeNumber(), sns[i]); |
| | | assertEquals(cursor.getRecord().getChangeNumber(), sns[i]); |
| | | final boolean isNotLast = i + 1 < sns.length; |
| | | assertEquals(cursor.next(), isNotLast); |
| | | } |