| | |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.io.Closeable; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.List; |
| | | import java.util.concurrent.locks.ReentrantReadWriteLock; |
| | | |
| | |
| | | */ |
| | | public class ReplicationDB |
| | | { |
| | | private static final int START = 0; |
| | | private static final int STOP = 1; |
| | | |
| | | private Database db = null; |
| | | private ReplicationDbEnv dbenv = null; |
| | | private ReplicationServer replicationServer; |
| | |
| | | * The lock used to provide exclusive access to the thread that close the db |
| | | * (shutdown or clear). |
| | | */ |
| | | private ReentrantReadWriteLock dbCloseLock; |
| | | private final ReentrantReadWriteLock dbCloseLock = |
| | | new ReentrantReadWriteLock(true); |
| | | |
| | | // Change counter management |
| | | // The Db itself does not allow to count records between a start and an end |
| | |
| | | // - a counter value : count of changes since previous counter record. |
| | | // |
| | | // A counter record has to follow the order of the db, so it needs to have |
| | | // a changenumber key that follow the order. |
| | | // a changenumber key that follows the order. |
| | | // A counter record must have its own changenumber key since the Db does not |
| | | // support duplicate key (it is a compatibility breaker character of the DB). |
| | | // support duplicate keys (it is a compatibility breaker character of the DB). |
| | | // |
| | | // We define 2 conditions to store a counter record : |
| | | // 1/- at least 'counterWindowSize' changes have been stored in the Db |
| | |
| | | |
| | | |
| | | /** Current value of the counter. */ |
| | | private int counterCurrValue = 1; |
| | | private int counterCurrValue = 1; |
| | | |
| | | /** |
| | | * When not null, the next change with a ts different from |
| | |
| | | * The counter record will never be written to the db more often than each |
| | | * counterWindowSize changes. |
| | | */ |
| | | private int counterWindowSize = 1000; |
| | | private int counterWindowSize = 1000; |
| | | |
| | | /** |
| | | * Creates a new database or open existing database that will be used |
| | |
| | | this.replicationServer = replicationServer; |
| | | |
| | | // Get or create the associated ReplicationServerDomain and Db. |
| | | db = dbenv.getOrAddDb(serverId, baseDn, |
| | | replicationServer.getReplicationServerDomain(baseDn, |
| | | true).getGenerationId()); |
| | | final ReplicationServerDomain domain = |
| | | replicationServer.getReplicationServerDomain(baseDn, true); |
| | | db = dbenv.getOrAddDb(serverId, baseDn, domain.getGenerationId()); |
| | | |
| | | dbCloseLock = new ReentrantReadWriteLock(true); |
| | | |
| | | Cursor cursor; |
| | | Transaction txn = null; |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status; |
| | | int distBackToCounterRecord = 0; |
| | | intializeCounters(); |
| | | } |
| | | |
| | | // Initialize counter |
| | | private void intializeCounters() |
| | | { |
| | | this.counterCurrValue = 1; |
| | | cursor = db.openCursor(txn, null); |
| | | |
| | | Cursor cursor = db.openCursor(null, null); |
| | | try |
| | | { |
| | | status = cursor.getLast(key, data, LockMode.DEFAULT); |
| | | int distBackToCounterRecord = 0; |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | ChangeNumber cn = new ChangeNumber(decodeUTF8(key.getData())); |
| | | if (!ReplicationDB.isaCounter(cn)) |
| | | ChangeNumber cn = toChangeNumber(key.getData()); |
| | | if (isACounterRecord(cn)) |
| | | { |
| | | status = cursor.getPrev(key, data, LockMode.DEFAULT); |
| | | distBackToCounterRecord++; |
| | | } |
| | | else |
| | | { |
| | | // counter record |
| | | counterCurrValue = decodeCounterValue(data.getData()) + 1; |
| | | counterTsLimit = cn.getTime(); |
| | | break; |
| | | } |
| | | |
| | | status = cursor.getPrev(key, data, LockMode.DEFAULT); |
| | | distBackToCounterRecord++; |
| | | } |
| | | counterCurrValue += distBackToCounterRecord; |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | private static ChangeNumber toChangeNumber(byte[] data) |
| | | { |
| | | return new ChangeNumber(decodeUTF8(data)); |
| | | } |
| | | |
| | | |
| | | /** |
| | |
| | | |
| | | for (UpdateMsg change : changes) |
| | | { |
| | | DatabaseEntry key = new ReplicationKey( |
| | | change.getChangeNumber()); |
| | | DatabaseEntry data = new ReplicationData(change); |
| | | final DatabaseEntry key = |
| | | createReplicationKey(change.getChangeNumber()); |
| | | final DatabaseEntry data = new ReplicationData(change); |
| | | |
| | | if ((counterCurrValue != 0) |
| | | && (counterCurrValue % counterWindowSize == 0)) |
| | | { |
| | | // enough changes to generate a counter record - wait for the next |
| | | // change of time |
| | | counterTsLimit = change.getChangeNumber().getTime(); |
| | | } |
| | | if ((counterTsLimit != 0) |
| | | && (change.getChangeNumber().getTime() != counterTsLimit)) |
| | | { |
| | | // Write the counter record |
| | | DatabaseEntry counterKey = new ReplicationKey( |
| | | new ChangeNumber(change.getChangeNumber().getTime(), |
| | | 0, 0)); |
| | | DatabaseEntry counterValue = |
| | | encodeCounterValue(counterCurrValue - 1); |
| | | db.put(null, counterKey, counterValue); |
| | | counterTsLimit = 0; |
| | | } |
| | | insertCounterRecordIfNeeded(change.getChangeNumber()); |
| | | db.put(null, key, data); |
| | | counterCurrValue++; |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | private void insertCounterRecordIfNeeded(ChangeNumber changeNumber) |
| | | { |
| | | if (counterCurrValue != 0 && (counterCurrValue % counterWindowSize == 0)) |
| | | { |
| | | // enough changes to generate a counter record |
| | | // wait for the next change of time |
| | | counterTsLimit = changeNumber.getTime(); |
| | | } |
| | | if (counterTsLimit != 0 && changeNumber.getTime() != counterTsLimit) |
| | | { |
| | | // Write the counter record |
| | | final ChangeNumber counterRecord = newCounterRecord(changeNumber); |
| | | DatabaseEntry counterKey = createReplicationKey(counterRecord); |
| | | DatabaseEntry counterValue = encodeCounterValue(counterCurrValue - 1); |
| | | db.put(null, counterKey, counterValue); |
| | | counterTsLimit = 0; |
| | | } |
| | | } |
| | | |
| | | private DatabaseEntry createReplicationKey(ChangeNumber changeNumber) |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | try |
| | | { |
| | | key.setData(changeNumber.toString().getBytes("UTF-8")); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // Should never happens, UTF-8 is always supported |
| | | // TODO : add better logging |
| | | } |
| | | return key; |
| | | } |
| | | |
| | | /** |
| | | * Shutdown the database. |
| | |
| | | |
| | | |
| | | |
| | | private void closeLockedCursor(Cursor cursor) |
| | | throws DatabaseException |
| | | private void closeAndReleaseReadLock(Cursor cursor) throws DatabaseException |
| | | { |
| | | try |
| | | { |
| | |
| | | public ChangeNumber readFirstChange() |
| | | { |
| | | Cursor cursor = null; |
| | | ChangeNumber cn = null; |
| | | |
| | | try |
| | | { |
| | | dbCloseLock.readLock().lock(); |
| | |
| | | return null; |
| | | } |
| | | |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | |
| | | cursor = db.openCursor(null, null); |
| | | |
| | | OperationStatus status = cursor.getFirst(key, data, |
| | | LockMode.DEFAULT); |
| | | |
| | | if (status != OperationStatus.SUCCESS) |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | LockMode defaultMode = LockMode.DEFAULT; |
| | | if (cursor.getFirst(key, data, defaultMode) != OperationStatus.SUCCESS) |
| | | { |
| | | /* database is empty */ |
| | | // database is empty |
| | | return null; |
| | | } |
| | | |
| | | String str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | final ChangeNumber cn = toChangeNumber(key.getData()); |
| | | if (!isACounterRecord(cn)) |
| | | { |
| | | // First record is a counter record .. go next |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | // DB contains only a counter record |
| | | return null; |
| | | } |
| | | else |
| | | { |
| | | cn = new ChangeNumber(decodeUTF8(key.getData())); |
| | | } |
| | | return cn; |
| | | } |
| | | |
| | | // First record is a counter record .. go next |
| | | if (cursor.getNext(key, data, defaultMode) != OperationStatus.SUCCESS) |
| | | { |
| | | // DB contains only a counter record |
| | | return null; |
| | | } |
| | | // There cannot be 2 counter record next to each other, |
| | | // it is safe to return this record |
| | | return toChangeNumber(key.getData()); |
| | | } |
| | | finally |
| | | { |
| | | closeLockedCursor(cursor); |
| | | closeAndReleaseReadLock(cursor); |
| | | } |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | /* database is faulty */ |
| | | replicationServer.handleUnexpectedDatabaseException(e); |
| | | cn = null; |
| | | return null; |
| | | } |
| | | return cn; |
| | | } |
| | | |
| | | |
| | |
| | | public ChangeNumber readLastChange() |
| | | { |
| | | Cursor cursor = null; |
| | | ChangeNumber cn = null; |
| | | |
| | | try |
| | | { |
| | | dbCloseLock.readLock().lock(); |
| | |
| | | return null; |
| | | } |
| | | |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | |
| | | cursor = db.openCursor(null, null); |
| | | |
| | | OperationStatus status = cursor.getLast(key, data, |
| | | LockMode.DEFAULT); |
| | | |
| | | if (status != OperationStatus.SUCCESS) |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | LockMode defaultMode = LockMode.DEFAULT; |
| | | if (cursor.getLast(key, data, defaultMode) != OperationStatus.SUCCESS) |
| | | { |
| | | /* database is empty */ |
| | | // database is empty |
| | | return null; |
| | | } |
| | | |
| | | String str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | final ChangeNumber cn = toChangeNumber(key.getData()); |
| | | if (!isACounterRecord(cn)) |
| | | { |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) |
| | | != OperationStatus.SUCCESS) |
| | | { |
| | | /* |
| | | * database only contain a counter record - don't know how much it |
| | | * can be possible but ... |
| | | */ |
| | | cn = null; |
| | | } |
| | | else |
| | | { |
| | | str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | // There can't be 2 counter record next to each other |
| | | } |
| | | return cn; |
| | | } |
| | | |
| | | if (cursor.getPrev(key, data, defaultMode) != OperationStatus.SUCCESS) |
| | | { |
| | | /* |
| | | * database only contain a counter record - don't know how much it can |
| | | * be possible but ... |
| | | */ |
| | | return null; |
| | | } |
| | | // There cannot be 2 counter record next to each other, |
| | | // it is safe to return this record |
| | | return toChangeNumber(key.getData()); |
| | | } |
| | | finally |
| | | { |
| | | closeLockedCursor(cursor); |
| | | closeAndReleaseReadLock(cursor); |
| | | } |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | replicationServer.handleUnexpectedDatabaseException(e); |
| | | cn = null; |
| | | return null; |
| | | } |
| | | |
| | | return cn; |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | Cursor cursor = null; |
| | | ChangeNumber cn = null; |
| | | |
| | | DatabaseEntry key = new ReplicationKey(changeNumber); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | |
| | | try |
| | | { |
| | | dbCloseLock.readLock().lock(); |
| | |
| | | return null; |
| | | } |
| | | |
| | | DatabaseEntry key = createReplicationKey(changeNumber); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | cursor = db.openCursor(null, null); |
| | | if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) |
| | | == OperationStatus.SUCCESS) |
| | |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) |
| | | == OperationStatus.SUCCESS) |
| | | { |
| | | String str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | { |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) |
| | | != OperationStatus.SUCCESS) |
| | | { |
| | | // database starts with a counter record. |
| | | cn = null; |
| | | } |
| | | else |
| | | { |
| | | str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | // There can't be 2 counter record next to each other |
| | | } |
| | | } |
| | | return getRegularRecord(cursor, key, data); |
| | | } |
| | | // else, there was no change previous to our changeNumber. |
| | | } |
| | |
| | | if (cursor.getLast(key, data, LockMode.DEFAULT) |
| | | == OperationStatus.SUCCESS) |
| | | { |
| | | String str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | { |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) |
| | | != OperationStatus.SUCCESS) |
| | | { |
| | | /* |
| | | * database only contain a counter record, should not be |
| | | * possible, but Ok, let's just say no change Number |
| | | */ |
| | | cn = null; |
| | | } |
| | | else |
| | | { |
| | | str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | // There can't be 2 counter record next to each other |
| | | } |
| | | } |
| | | return getRegularRecord(cursor, key, data); |
| | | } |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | closeLockedCursor(cursor); |
| | | closeAndReleaseReadLock(cursor); |
| | | } |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | replicationServer.handleUnexpectedDatabaseException(e); |
| | | cn = null; |
| | | } |
| | | return cn; |
| | | return null; |
| | | } |
| | | |
| | | private ChangeNumber getRegularRecord(Cursor cursor, DatabaseEntry key, |
| | | DatabaseEntry data) |
| | | { |
| | | final ChangeNumber cn = toChangeNumber(key.getData()); |
| | | if (!isACounterRecord(cn)) |
| | | { |
| | | return cn; |
| | | } |
| | | |
| | | // There cannot be 2 counter record next to each other, |
| | | // it is safe to return previous record which must exist |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS) |
| | | { |
| | | return toChangeNumber(key.getData()); |
| | | } |
| | | |
| | | // database only contain a counter record, which should not be possible |
| | | // let's just say no changeNumber |
| | | return null; |
| | | } |
| | | |
| | | |
| | |
| | | { |
| | | if (startingChangeNumber != null) |
| | | { |
| | | key = new ReplicationKey(startingChangeNumber); |
| | | key = createReplicationKey(startingChangeNumber); |
| | | } |
| | | else |
| | | { |
| | |
| | | catch (Exception e) |
| | | { |
| | | // Unlocking is required before throwing any exception |
| | | closeLockedCursor(localCursor); |
| | | closeAndReleaseReadLock(localCursor); |
| | | throw e; |
| | | } |
| | | } |
| | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | closeLockedCursor(localCursor); |
| | | closeAndReleaseReadLock(localCursor); |
| | | |
| | | if (localTxn != null) |
| | | { |
| | |
| | | isClosed = true; |
| | | } |
| | | |
| | | closeLockedCursor(cursor); |
| | | closeAndReleaseReadLock(cursor); |
| | | |
| | | if (txn != null) |
| | | { |
| | | try |
| | |
| | | isClosed = true; |
| | | } |
| | | |
| | | closeLockedCursor(cursor); |
| | | closeAndReleaseReadLock(cursor); |
| | | |
| | | if (txn != null) |
| | | { |
| | |
| | | } |
| | | |
| | | OperationStatus status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | return null; |
| | | } |
| | | String csnString = decodeUTF8(key.getData()); |
| | | return new ChangeNumber(csnString); |
| | | return toChangeNumber(key.getData()); |
| | | } |
| | | |
| | | /** |
| | |
| | | ChangeNumber cn = null; |
| | | try |
| | | { |
| | | cn = new ChangeNumber( |
| | | decodeUTF8(key.getData())); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | cn = toChangeNumber(key.getData()); |
| | | if (isACounterRecord(cn)) |
| | | { |
| | | // counter record |
| | | continue; |
| | | } |
| | | currentChange = ReplicationData.generateChange(data |
| | | .getData()); |
| | | currentChange = ReplicationData.generateChange(data.getData()); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | dbenv.clearDb(dbName); |
| | | |
| | | // RE-create the db |
| | | db = dbenv.getOrAddDb(serverId, baseDn, (long)-1); |
| | | db = dbenv.getOrAddDb(serverId, baseDn, -1); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | |
| | | * @return The number of changes between provided start and stop changeNumber. |
| | | * Returns 0 when an error occurs. |
| | | */ |
| | | public int count(ChangeNumber start, ChangeNumber stop) |
| | | public long count(ChangeNumber start, ChangeNumber stop) |
| | | { |
| | | int counterRecord1 = 0; |
| | | int counterRecord2 = 0; |
| | | int distToCounterRecord1 = 0; |
| | | int distBackToCounterRecord2 = 0; |
| | | int count=0; |
| | | OperationStatus status; |
| | | |
| | | try |
| | | { |
| | | dbCloseLock.readLock().lock(); |
| | |
| | | { |
| | | return 0; |
| | | } |
| | | if (start == null && stop == null) |
| | | { |
| | | return db.count(); |
| | | } |
| | | |
| | | ChangeNumber cn ; |
| | | |
| | | if ((start==null)&&(stop==null)) |
| | | return (int)db.count(); |
| | | int[] counterValues = new int[2]; |
| | | int[] distanceToCounterRecords = new int[2]; |
| | | |
| | | // Step 1 : from the start point, traverse db to the next counter record |
| | | // or to the stop point. |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | cursor = db.openCursor(null, null); |
| | | if (start != null) |
| | | { |
| | | key = new ReplicationKey(start); |
| | | status = cursor.getSearchKey(key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT); |
| | | } |
| | | else |
| | | { |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | // test whether the record is a regular change or a counter |
| | | String csnString = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(csnString); |
| | | if (cn.getServerId() != 0) |
| | | { |
| | | // reached a regular change record |
| | | // test whether we reached the 'stop' target |
| | | if (!cn.newer(stop)) |
| | | { |
| | | // let's loop |
| | | distToCounterRecord1++; |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | else |
| | | { |
| | | // reached the end |
| | | break; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // counter record |
| | | counterRecord1 = decodeCounterValue(data.getData()); |
| | | break; |
| | | } |
| | | } |
| | | findFirstCounterRecordAfterStartPoint(start, stop, cursor, |
| | | counterValues, distanceToCounterRecords); |
| | | cursor.close(); |
| | | |
| | | // cases |
| | | // |
| | | if (counterRecord1==0) |
| | | return distToCounterRecord1; |
| | | if (counterValues[START] == 0) |
| | | return distanceToCounterRecords[START]; |
| | | |
| | | // Step 2 : from the stop point, traverse db to the next counter record |
| | | // or to the start point. |
| | | data = new DatabaseEntry(); |
| | | key = new ReplicationKey(stop); |
| | | cursor = db.openCursor(null, null); |
| | | status = cursor.getSearchKey(key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.SUCCESS) |
| | | if (!findFirstCounterRecordBeforeStopPoint(start, stop, cursor, |
| | | counterValues, distanceToCounterRecords)) |
| | | { |
| | | cn = new ChangeNumber(decodeUTF8(key.getData())); |
| | | } |
| | | else |
| | | { |
| | | key = new DatabaseEntry(); |
| | | data = new DatabaseEntry(); |
| | | status = cursor.getLast(key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | /* database is empty */ |
| | | return 0; |
| | | } |
| | | } |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | cn = new ChangeNumber(decodeUTF8(key.getData())); |
| | | if (!ReplicationDB.isaCounter(cn)) |
| | | { |
| | | // regular change record |
| | | if (!cn.older(start)) |
| | | { |
| | | distBackToCounterRecord2++; |
| | | status = cursor.getPrev(key, data, LockMode.DEFAULT); |
| | | } |
| | | else |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | // counter record |
| | | counterRecord2 = decodeCounterValue(data.getData()); |
| | | break; |
| | | } |
| | | // database is empty |
| | | return 0; |
| | | } |
| | | cursor.close(); |
| | | |
| | | // Step 3 : Now consolidates the result |
| | | if (counterRecord1!=0) |
| | | { |
| | | if (counterRecord1 == counterRecord2) |
| | | { |
| | | // only one cp between from and to - no need to use it |
| | | count = distToCounterRecord1 + distBackToCounterRecord2; |
| | | } |
| | | else |
| | | { |
| | | // 2 cp between from and to |
| | | count = distToCounterRecord1 + (counterRecord2-counterRecord1) |
| | | + distBackToCounterRecord2; |
| | | } |
| | | } |
| | | return computeDistance(counterValues, distanceToCounterRecords); |
| | | } |
| | | finally |
| | | { |
| | | closeLockedCursor(cursor); |
| | | closeAndReleaseReadLock(cursor); |
| | | } |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | replicationServer.handleUnexpectedDatabaseException(e); |
| | | } |
| | | return count; |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | private void findFirstCounterRecordAfterStartPoint(ChangeNumber start, |
| | | ChangeNumber stop, Cursor cursor, int[] counterValues, |
| | | int[] distanceToCounterRecords) |
| | | { |
| | | OperationStatus status; |
| | | DatabaseEntry key; |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | if (start != null) |
| | | { |
| | | key = createReplicationKey(start); |
| | | status = cursor.getSearchKey(key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT); |
| | | } |
| | | else |
| | | { |
| | | key = new DatabaseEntry(); |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | // test whether the record is a regular change or a counter |
| | | final ChangeNumber cn = toChangeNumber(key.getData()); |
| | | if (isACounterRecord(cn)) |
| | | { |
| | | // we have found the counter record |
| | | counterValues[START] = decodeCounterValue(data.getData()); |
| | | break; |
| | | } |
| | | |
| | | // reached a regular change record |
| | | // test whether we reached the 'stop' target |
| | | if (!cn.newer(stop)) |
| | | { |
| | | // let's loop |
| | | distanceToCounterRecords[START]++; |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | else |
| | | { |
| | | // reached the end |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | |
| | | private boolean findFirstCounterRecordBeforeStopPoint(ChangeNumber start, |
| | | ChangeNumber stop, Cursor cursor, int[] counterValues, |
| | | int[] distanceToCounterRecords) |
| | | { |
| | | DatabaseEntry key = createReplicationKey(stop); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = cursor.getSearchKey(key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | key = new DatabaseEntry(); |
| | | data = new DatabaseEntry(); |
| | | status = cursor.getLast(key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | final ChangeNumber cn = toChangeNumber(key.getData()); |
| | | if (isACounterRecord(cn)) |
| | | { |
| | | // we have found the counter record |
| | | counterValues[STOP] = decodeCounterValue(data.getData()); |
| | | break; |
| | | } |
| | | |
| | | // it is a regular change record |
| | | if (!cn.older(start)) |
| | | { |
| | | distanceToCounterRecords[STOP]++; |
| | | status = cursor.getPrev(key, data, LockMode.DEFAULT); |
| | | } |
| | | else |
| | | break; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Test if a provided changeNumber represents a counter record. |
| | | * @param cn The provided changeNumber. |
| | | * @return True if the provided changenumber is a counter. |
| | | * The diagram below shows a visual description of how the distance between |
| | | * two change numbers in the database is computed. |
| | | * |
| | | * <pre> |
| | | * +--------+ +--------+ |
| | | * | CASE 1 | | CASE 2 | |
| | | * +--------+ +--------+ |
| | | * |
| | | * CSN CSN |
| | | * ----- ----- |
| | | * START => ----- START => ----- |
| | | * ^ ----- ^ ----- |
| | | * | ----- | ----- |
| | | * dist 1 ----- dist 1 ----- |
| | | * | ----- | ----- |
| | | * v ----- v ----- |
| | | * CR 1&2 => [1000] CR 1 => [1000] |
| | | * ^ ----- ----- |
| | | * | ----- ----- |
| | | * dist 2 ----- ----- |
| | | * | ----- ----- |
| | | * v ----- ----- |
| | | * STOP => ----- ----- |
| | | * ----- ----- |
| | | * CR => [2000] CR 2 => [2000] |
| | | * ----- ^ ----- |
| | | * | ----- |
| | | * dist 2 ----- |
| | | * | ----- |
| | | * v ----- |
| | | * STOP => ----- |
| | | * </pre> |
| | | * |
| | | * Explanation of the terms used: |
| | | * <dl> |
| | | * <dt>START</dt> |
| | | * <dd>Start change number for the count</dd> |
| | | * <dt>STOP</dt> |
| | | * <dd>Stop change number for the count</dd> |
| | | * <dt>dist</dt> |
| | | * <dd>Distance from START (or STOP) to the counter record</dd> |
| | | * <dt>CSN</dt> |
| | | * <dd>Stands for "Change Sequence Number". Below it, the database is |
| | | * symbolized, where each record is represented by using dashes "-----". The |
| | | * database is ordered.</dd> |
| | | * <dt>CR</dt> |
| | | * <dd>Stands for "Counter Record". Counter Records are inserted in the |
| | | * database along with real change numbers, but they are not real changes. |
| | | * They are only used to speed up calculating the distance between 2 change |
| | | * numbers without the need to scan the whole database in between.</dd> |
| | | * </dl> |
| | | */ |
| | | static private boolean isaCounter(ChangeNumber cn) |
| | | private long computeDistance(int[] counterValues, |
| | | int[] distanceToCounterRecords) |
| | | { |
| | | return ((cn.getServerId()== 0) && (cn.getSeqnum()==0)); |
| | | if (counterValues[START] != 0) |
| | | { |
| | | if (counterValues[START] == counterValues[STOP]) |
| | | { |
| | | // only one counter record between from and to - no need to use it |
| | | return distanceToCounterRecords[START] + distanceToCounterRecords[STOP]; |
| | | } |
| | | // at least 2 counter records between from and to |
| | | return distanceToCounterRecords[START] |
| | | + (counterValues[STOP] - counterValues[START]) |
| | | + distanceToCounterRecords[STOP]; |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * Whether a provided changeNumber represents a counter record. A counter |
| | | * record is used to store TODO. |
| | | * |
| | | * @param cn |
| | | * The changeNumber to test |
| | | * @return true if the provided changenumber is a counter, false otherwise |
| | | */ |
| | | private static boolean isACounterRecord(ChangeNumber cn) |
| | | { |
| | | return cn.getServerId() == 0 && cn.getSeqnum() == 0; |
| | | } |
| | | |
| | | private static ChangeNumber newCounterRecord(ChangeNumber changeNumber) |
| | | { |
| | | return new ChangeNumber(changeNumber.getTime(), 0, 0); |
| | | } |
| | | |
| | | /** |