| | |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | |
| | | private static final int START = 0; |
| | | private static final int STOP = 1; |
| | | |
| | | private Database db = null; |
| | | private ReplicationDbEnv dbenv = null; |
| | | private Database db; |
| | | private ReplicationDbEnv dbenv; |
| | | private ReplicationServer replicationServer; |
| | | private int serverId; |
| | | private String baseDn; |
| | |
| | | // Change counter management |
| | | // The Db itself does not allow to count records between a start and an end |
| | | // change. And we cannot rely on the replication seqnum that is part of the |
| | | // changenumber, since there can be holes (when an operation is canceled). |
| | | // CSN, since there can be holes (when an operation is canceled). |
| | | // And traversing all the records from the start one to the end one works |
| | | // fine but can be very long (ECL:lastChangeNumber). |
| | | // |
| | |
| | | // - a counter value : count of changes since previous counter record. |
| | | // |
| | | // A counter record has to follow the order of the db, so it needs to have |
| | | // a changenumber key that follows the order. |
| | | // A counter record must have its own changenumber key since the Db does not |
| | | // support duplicate keys (it is a compatibility breaker character of the DB). |
| | | // a CSN key that follows the order. |
| | | // A counter record must have its own CSN key since the Db does not support |
| | | // duplicate keys (it is a compatibility breaker character of the DB). |
| | | // |
| | | // We define 2 conditions to store a counter record : |
| | | // 1/- at least 'counterWindowSize' changes have been stored in the Db |
| | |
| | | OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | ChangeNumber cn = toChangeNumber(key.getData()); |
| | | if (isACounterRecord(cn)) |
| | | CSN csn = toCSN(key.getData()); |
| | | if (isACounterRecord(csn)) |
| | | { |
| | | counterCurrValue = decodeCounterValue(data.getData()) + 1; |
| | | counterTsLimit = cn.getTime(); |
| | | counterTsLimit = csn.getTime(); |
| | | break; |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | private static ChangeNumber toChangeNumber(byte[] data) |
| | | private static CSN toCSN(byte[] data) |
| | | { |
| | | return new ChangeNumber(decodeUTF8(data)); |
| | | return new CSN(decodeUTF8(data)); |
| | | } |
| | | |
| | | |
| | |
| | | for (UpdateMsg change : changes) |
| | | { |
| | | final DatabaseEntry key = |
| | | createReplicationKey(change.getChangeNumber()); |
| | | createReplicationKey(change.getCSN()); |
| | | final DatabaseEntry data = new ReplicationData(change); |
| | | |
| | | insertCounterRecordIfNeeded(change.getChangeNumber()); |
| | | insertCounterRecordIfNeeded(change.getCSN()); |
| | | db.put(null, key, data); |
| | | counterCurrValue++; |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | private void insertCounterRecordIfNeeded(ChangeNumber changeNumber) |
| | | throws DatabaseException |
| | | private void insertCounterRecordIfNeeded(CSN csn) throws DatabaseException |
| | | { |
| | | if (counterCurrValue != 0 && (counterCurrValue % counterWindowSize == 0)) |
| | | { |
| | | // enough changes to generate a counter record |
| | | // wait for the next change of time |
| | | counterTsLimit = changeNumber.getTime(); |
| | | counterTsLimit = csn.getTime(); |
| | | } |
| | | if (counterTsLimit != 0 && changeNumber.getTime() != counterTsLimit) |
| | | if (counterTsLimit != 0 && csn.getTime() != counterTsLimit) |
| | | { |
| | | // Write the counter record |
| | | final ChangeNumber counterRecord = newCounterRecord(changeNumber); |
| | | final CSN counterRecord = newCounterRecord(csn); |
| | | DatabaseEntry counterKey = createReplicationKey(counterRecord); |
| | | DatabaseEntry counterValue = encodeCounterValue(counterCurrValue - 1); |
| | | db.put(null, counterKey, counterValue); |
| | |
| | | } |
| | | } |
| | | |
| | | private DatabaseEntry createReplicationKey(ChangeNumber changeNumber) |
| | | private DatabaseEntry createReplicationKey(CSN csn) |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | try |
| | | { |
| | | key.setData(changeNumber.toString().getBytes("UTF-8")); |
| | | key.setData(csn.toString().getBytes("UTF-8")); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | |
| | | * Create a cursor that can be used to search or iterate on this |
| | | * ReplicationServer DB. |
| | | * |
| | | * @param changeNumber The ChangeNumber from which the cursor must start. |
| | | * @param startCSN |
| | | * The CSN from which the cursor must start. |
| | | * @throws ChangelogException |
| | | * When a problem occurs or the startingChangeNumber does not exist. |
| | | * When a problem occurs or the startCSN does not exist. |
| | | * @return The ReplServerDBCursor. |
| | | */ |
| | | public ReplServerDBCursor openReadCursor(ChangeNumber changeNumber) |
| | | public ReplServerDBCursor openReadCursor(CSN startCSN) |
| | | throws ChangelogException |
| | | { |
| | | return new ReplServerDBCursor(changeNumber); |
| | | return new ReplServerDBCursor(startCSN); |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | /** |
| | | * Read the first Change from the database. |
| | | * @return the first ChangeNumber. |
| | | * |
| | | * @return the first CSN. |
| | | */ |
| | | public ChangeNumber readFirstChange() |
| | | public CSN readFirstChange() |
| | | { |
| | | dbCloseLock.readLock().lock(); |
| | | |
| | |
| | | return null; |
| | | } |
| | | |
| | | final ChangeNumber cn = toChangeNumber(key.getData()); |
| | | if (!isACounterRecord(cn)) |
| | | final CSN csn = toCSN(key.getData()); |
| | | if (!isACounterRecord(csn)) |
| | | { |
| | | return cn; |
| | | return csn; |
| | | } |
| | | |
| | | // First record is a counter record .. go next |
| | |
| | | } |
| | | // There cannot be 2 counter record next to each other, |
| | | // it is safe to return this record |
| | | return toChangeNumber(key.getData()); |
| | | return toCSN(key.getData()); |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | |
| | | /** |
| | | * Read the last Change from the database. |
| | | * |
| | | * @return the last ChangeNumber. |
| | | * @return the last CSN. |
| | | */ |
| | | public ChangeNumber readLastChange() |
| | | public CSN readLastChange() |
| | | { |
| | | dbCloseLock.readLock().lock(); |
| | | |
| | |
| | | return null; |
| | | } |
| | | |
| | | final ChangeNumber cn = toChangeNumber(key.getData()); |
| | | if (!isACounterRecord(cn)) |
| | | final CSN csn = toCSN(key.getData()); |
| | | if (!isACounterRecord(csn)) |
| | | { |
| | | return cn; |
| | | return csn; |
| | | } |
| | | |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS) |
| | |
| | | } |
| | | // There cannot be 2 counter record next to each other, |
| | | // it is safe to return this record |
| | | return toChangeNumber(key.getData()); |
| | | return toCSN(key.getData()); |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Try to find in the DB, the change number right before the one |
| | | * passed as a parameter. |
| | | * Try to find in the DB, the CSN right before the one passed as a parameter. |
| | | * |
| | | * @param changeNumber |
| | | * The changeNumber from which we start searching. |
| | | * @return the changeNumber right before the one passed as a parameter. |
| | | * Can return null if there is none. |
| | | * @param csn |
| | | * The CSN from which we start searching. |
| | | * @return the CSN right before the one passed as a parameter. Can return null |
| | | * if there is none. |
| | | */ |
| | | public ChangeNumber getPreviousChangeNumber(ChangeNumber changeNumber) |
| | | public CSN getPreviousCSN(CSN csn) |
| | | { |
| | | if (changeNumber == null) |
| | | if (csn == null) |
| | | { |
| | | return null; |
| | | } |
| | |
| | | return null; |
| | | } |
| | | |
| | | DatabaseEntry key = createReplicationKey(changeNumber); |
| | | DatabaseEntry key = createReplicationKey(csn); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | cursor = db.openCursor(null, null); |
| | | if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) == SUCCESS) |
| | | { |
| | | // We can move close to the changeNumber. |
| | | // We can move close to the CSN. |
| | | // Let's move to the previous change. |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS) |
| | | { |
| | | return getRegularRecord(cursor, key, data); |
| | | } |
| | | // else, there was no change previous to our changeNumber. |
| | | // else, there was no change previous to our CSN. |
| | | } |
| | | else |
| | | { |
| | | // We could not move the cursor past to the changeNumber |
| | | // Check if the last change is older than changeNumber |
| | | // We could not move the cursor past to the CSN |
| | | // Check if the last change is older than CSN |
| | | if (cursor.getLast(key, data, LockMode.DEFAULT) == SUCCESS) |
| | | { |
| | | return getRegularRecord(cursor, key, data); |
| | |
| | | return null; |
| | | } |
| | | |
| | | private ChangeNumber getRegularRecord(Cursor cursor, DatabaseEntry key, |
| | | private CSN getRegularRecord(Cursor cursor, DatabaseEntry key, |
| | | DatabaseEntry data) throws DatabaseException |
| | | { |
| | | final ChangeNumber cn = toChangeNumber(key.getData()); |
| | | if (!isACounterRecord(cn)) |
| | | final CSN csn = toCSN(key.getData()); |
| | | if (!isACounterRecord(csn)) |
| | | { |
| | | return cn; |
| | | return csn; |
| | | } |
| | | |
| | | // There cannot be 2 counter record next to each other, |
| | | // it is safe to return previous record which must exist |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS) |
| | | { |
| | | return toChangeNumber(key.getData()); |
| | | return toCSN(key.getData()); |
| | | } |
| | | |
| | | // database only contain a counter record, which should not be possible |
| | | // let's just say no changeNumber |
| | | // let's just say no CSN |
| | | return null; |
| | | } |
| | | |
| | |
| | | * Creates a ReplServerDBCursor that can be used for browsing a |
| | | * replicationServer db. |
| | | * |
| | | * @param startingChangeNumber |
| | | * The ChangeNumber from which the cursor must start. |
| | | * @param startCSN |
| | | * The CSN from which the cursor must start. |
| | | * @throws ChangelogException |
| | | * When the startingChangeNumber does not exist. |
| | | * When the startCSN does not exist. |
| | | */ |
| | | private ReplServerDBCursor(ChangeNumber startingChangeNumber) |
| | | throws ChangelogException |
| | | private ReplServerDBCursor(CSN startCSN) throws ChangelogException |
| | | { |
| | | if (startingChangeNumber != null) |
| | | if (startCSN != null) |
| | | { |
| | | key = createReplicationKey(startingChangeNumber); |
| | | key = createReplicationKey(startCSN); |
| | | } |
| | | else |
| | | { |
| | |
| | | } |
| | | |
| | | localCursor = db.openCursor(txn, null); |
| | | if (startingChangeNumber != null) |
| | | if (startCSN != null) |
| | | { |
| | | if (localCursor.getSearchKey(key, data, LockMode.DEFAULT) != SUCCESS) |
| | | { |
| | | // We could not move the cursor to the expected startingChangeNumber |
| | | // We could not move the cursor to the expected startCSN |
| | | if (localCursor.getSearchKeyRange(key, data, DEFAULT) != SUCCESS) |
| | | { |
| | | // We could not even move the cursor closed to it => failure |
| | | throw new ChangelogException( |
| | | Message.raw("ChangeNumber not available")); |
| | | // We could not even move the cursor close to it => failure |
| | | throw new ChangelogException(Message.raw("CSN not available")); |
| | | } |
| | | |
| | | // We can move close to the startingChangeNumber. |
| | | // We can move close to the startCSN. |
| | | // Let's create a cursor from that point. |
| | | DatabaseEntry aKey = new DatabaseEntry(); |
| | | DatabaseEntry aData = new DatabaseEntry(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the next ChangeNumber in the database from this Cursor. |
| | | * Get the next CSN in the database from this Cursor. |
| | | * |
| | | * @return The next ChangeNumber in the database from this cursor. |
| | | * @throws ChangelogException In case of underlying database problem. |
| | | * @return The next CSN in the database from this cursor. |
| | | * @throws ChangelogException |
| | | * In case of underlying database problem. |
| | | */ |
| | | public ChangeNumber nextChangeNumber() throws ChangelogException |
| | | public CSN nextCSN() throws ChangelogException |
| | | { |
| | | if (isClosed) |
| | | { |
| | |
| | | { |
| | | return null; |
| | | } |
| | | return toChangeNumber(key.getData()); |
| | | return toCSN(key.getData()); |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | |
| | | return null; |
| | | } |
| | | |
| | | ChangeNumber cn = null; |
| | | CSN csn = null; |
| | | try |
| | | { |
| | | cn = toChangeNumber(key.getData()); |
| | | if (isACounterRecord(cn)) |
| | | csn = toCSN(key.getData()); |
| | | if (isACounterRecord(csn)) |
| | | { |
| | | continue; |
| | | } |
| | |
| | | */ |
| | | Message message = ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD |
| | | .get(replicationServer.getServerId(), |
| | | (cn == null ? "" : cn.toString()), |
| | | (csn == null ? "" : csn.toString()), |
| | | e.getMessage()); |
| | | logError(message); |
| | | } |
| | |
| | | * Count the number of changes between 2 changes numbers (inclusive). |
| | | * @param start The lower limit of the count. |
| | | * @param stop The higher limit of the count. |
| | | * @return The number of changes between provided start and stop changeNumber. |
| | | * @return The number of changes between provided start and stop CSN. |
| | | * Returns 0 when an error occurs. |
| | | */ |
| | | public long count(ChangeNumber start, ChangeNumber stop) |
| | | public long count(CSN start, CSN stop) |
| | | { |
| | | dbCloseLock.readLock().lock(); |
| | | try |
| | |
| | | } |
| | | |
| | | |
| | | private void findFirstCounterRecordAfterStartPoint(ChangeNumber start, |
| | | ChangeNumber stop, int[] counterValues, int[] distanceToCounterRecords) |
| | | private void findFirstCounterRecordAfterStartPoint(CSN start, |
| | | CSN stop, int[] counterValues, int[] distanceToCounterRecords) |
| | | throws DatabaseException |
| | | { |
| | | Cursor cursor = db.openCursor(null, null); |
| | |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | // test whether the record is a regular change or a counter |
| | | final ChangeNumber cn = toChangeNumber(key.getData()); |
| | | if (isACounterRecord(cn)) |
| | | final CSN csn = toCSN(key.getData()); |
| | | if (isACounterRecord(csn)) |
| | | { |
| | | // we have found the counter record |
| | | counterValues[START] = decodeCounterValue(data.getData()); |
| | |
| | | |
| | | // reached a regular change record |
| | | // test whether we reached the 'stop' target |
| | | if (!cn.newer(stop)) |
| | | if (!csn.newer(stop)) |
| | | { |
| | | // let's loop |
| | | distanceToCounterRecords[START]++; |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean findFirstCounterRecordBeforeStopPoint(ChangeNumber start, |
| | | ChangeNumber stop, int[] counterValues, int[] distanceToCounterRecords) |
| | | private boolean findFirstCounterRecordBeforeStopPoint(CSN start, |
| | | CSN stop, int[] counterValues, int[] distanceToCounterRecords) |
| | | throws DatabaseException |
| | | { |
| | | Cursor cursor = db.openCursor(null, null); |
| | |
| | | |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | final ChangeNumber cn = toChangeNumber(key.getData()); |
| | | if (isACounterRecord(cn)) |
| | | final CSN csn = toCSN(key.getData()); |
| | | if (isACounterRecord(csn)) |
| | | { |
| | | // we have found the counter record |
| | | counterValues[STOP] = decodeCounterValue(data.getData()); |
| | |
| | | } |
| | | |
| | | // it is a regular change record |
| | | if (!cn.older(start)) |
| | | if (!csn.older(start)) |
| | | { |
| | | distanceToCounterRecords[STOP]++; |
| | | status = cursor.getPrev(key, data, LockMode.DEFAULT); |
| | |
| | | |
| | | /** |
| | | * The diagram below shows a visual description of how the distance between |
| | | * two change numbers in the database is computed. |
| | | * two CSNs in the database is computed. |
| | | * |
| | | * <pre> |
| | | * +--------+ +--------+ |
| | |
| | | * Explanation of the terms used: |
| | | * <dl> |
| | | * <dt>START</dt> |
| | | * <dd>Start change number for the count</dd> |
| | | * <dd>Start CSN for the count</dd> |
| | | * <dt>STOP</dt> |
| | | * <dd>Stop change number for the count</dd> |
| | | * <dd>Stop CSN for the count</dd> |
| | | * <dt>dist</dt> |
| | | * <dd>Distance from START (or STOP) to the counter record</dd> |
| | | * <dt>CSN</dt> |
| | |
| | | * database is ordered.</dd> |
| | | * <dt>CR</dt> |
| | | * <dd>Stands for "Counter Record". Counter Records are inserted in the |
| | | * database along with real change numbers, but they are not real changes. |
| | | * They are only used to speed up calculating the distance between 2 change |
| | | * numbers without the need to scan the whole database in between.</dd> |
| | | * database along with real CSNs, but they are not real changes. They are only |
| | | * used to speed up calculating the distance between 2 CSNs without the need |
| | | * to scan the whole database in between.</dd> |
| | | * </dl> |
| | | */ |
| | | private long computeDistance(int[] counterValues, |
| | |
| | | } |
| | | |
| | | /** |
| | | * Whether a provided changeNumber represents a counter record. A counter |
| | | * record is used to store TODO. |
| | | * Whether a provided CSN represents a counter record. A counter record is |
| | | * used to store the time. |
| | | * |
| | | * @param cn |
| | | * The changeNumber to test |
| | | * @return true if the provided changenumber is a counter, false otherwise |
| | | * @param csn |
| | | * The CSN to test |
| | | * @return true if the provided CSN is a counter, false otherwise |
| | | */ |
| | | private static boolean isACounterRecord(ChangeNumber cn) |
| | | private static boolean isACounterRecord(CSN csn) |
| | | { |
| | | return cn.getServerId() == 0 && cn.getSeqnum() == 0; |
| | | return csn.getServerId() == 0 && csn.getSeqnum() == 0; |
| | | } |
| | | |
| | | private static ChangeNumber newCounterRecord(ChangeNumber changeNumber) |
| | | private static CSN newCounterRecord(CSN csn) |
| | | { |
| | | return new ChangeNumber(changeNumber.getTime(), 0, 0); |
| | | return new CSN(csn.getTime(), 0, 0); |
| | | } |
| | | |
| | | /** |