| | |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import java.util.List; |
| | | import java.io.UnsupportedEncodingException; |
| | | |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | |
| | | import java.util.concurrent.locks.ReentrantReadWriteLock; |
| | | |
| | | import com.sleepycat.je.Cursor; |
| | | import com.sleepycat.je.DatabaseEntry; |
| | | import com.sleepycat.je.DatabaseException; |
| | | import com.sleepycat.je.Database; |
| | | import com.sleepycat.je.LockConflictException; |
| | | import com.sleepycat.je.LockMode; |
| | | import com.sleepycat.je.OperationStatus; |
| | | import com.sleepycat.je.Transaction; |
| | | import com.sleepycat.je.*; |
| | | |
| | | /** |
| | | * This class implements the interface between the underlying database |
| | |
| | | private int serverId; |
| | | private String baseDn; |
| | | |
| | | // The maximum number of retries in case of DatabaseDeadlock Exception. |
| | | private static final int DEADLOCK_RETRIES = 10; |
| | | |
| | | // The lock used to provide exclusive access to the thread that |
| | | // close the db (shutdown or clear). |
| | | private ReentrantReadWriteLock dbCloseLock; |
| | |
| | | */ |
| | | public void addEntries(List<UpdateMsg> changes) |
| | | { |
| | | Transaction txn = null; |
| | | |
| | | try |
| | | { |
| | | int tries = 0; |
| | | boolean done = false; |
| | | |
| | | // The database can return a Deadlock Exception if several threads are |
| | | // accessing the database at the same time. This Exception is a |
| | | // transient state, when it happens the transaction is aborted and |
| | | // the operation is attempted again up to DEADLOCK_RETRIES times. |
| | | while ((tries++ < DEADLOCK_RETRIES) && (!done)) |
| | | dbCloseLock.readLock().lock(); |
| | | try |
| | | { |
| | | dbCloseLock.readLock().lock(); |
| | | try |
| | | for (UpdateMsg change : changes) |
| | | { |
| | | txn = dbenv.beginTransaction(); |
| | | DatabaseEntry key = new ReplicationKey( |
| | | change.getChangeNumber()); |
| | | DatabaseEntry data = new ReplicationData(change); |
| | | |
| | | for (UpdateMsg change : changes) |
| | | if ((counterCurrValue != 0) |
| | | && (counterCurrValue % counterWindowSize == 0)) |
| | | { |
| | | DatabaseEntry key = new ReplicationKey(change.getChangeNumber()); |
| | | DatabaseEntry data = new ReplicationData(change); |
| | | |
| | | if ((counterCurrValue!=0) && |
| | | (counterCurrValue%counterWindowSize == 0)) |
| | | { |
| | | // enough changes to generate a counter record - wait for the next |
| | | // change fo time |
| | | counterTsLimit = change.getChangeNumber().getTime(); |
| | | } |
| | | if ((counterTsLimit!=0) |
| | | && (change.getChangeNumber().getTime() != counterTsLimit)) |
| | | { |
| | | // Write the counter record |
| | | DatabaseEntry counterKey = new ReplicationKey( |
| | | new ChangeNumber( |
| | | change.getChangeNumber().getTime(), |
| | | 0, 0)); |
| | | DatabaseEntry counterValue = |
| | | encodeCounterValue(counterCurrValue-1); |
| | | db.put(txn, counterKey, counterValue); |
| | | counterTsLimit=0; |
| | | } |
| | | db.put(txn, key, data); |
| | | counterCurrValue++; |
| | | |
| | | // enough changes to generate a counter record - wait for the next |
| | | // change of time |
| | | counterTsLimit = change.getChangeNumber().getTime(); |
| | | } |
| | | txn.commitWriteNoSync(); |
| | | txn = null; |
| | | done = true; |
| | | } |
| | | catch (LockConflictException e) |
| | | { |
| | | // Try again. |
| | | } |
| | | finally |
| | | { |
| | | if (txn != null) |
| | | if ((counterTsLimit != 0) |
| | | && (change.getChangeNumber().getTime() != counterTsLimit)) |
| | | { |
| | | // No effect if txn has committed. |
| | | txn.abort(); |
| | | txn = null; |
| | | // Write the counter record |
| | | DatabaseEntry counterKey = new ReplicationKey( |
| | | new ChangeNumber(change.getChangeNumber().getTime(), |
| | | 0, 0)); |
| | | DatabaseEntry counterValue = |
| | | encodeCounterValue(counterCurrValue - 1); |
| | | db.put(null, counterKey, counterValue); |
| | | counterTsLimit = 0; |
| | | } |
| | | |
| | | dbCloseLock.readLock().unlock(); |
| | | db.put(null, key, data); |
| | | counterCurrValue++; |
| | | } |
| | | } |
| | | if (!done) |
| | | finally |
| | | { |
| | | // Could not write to the DB after DEADLOCK_RETRIES tries. |
| | | // This ReplicationServer is not reliable and will be shutdown. |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); |
| | | logError(mb.toMessage()); |
| | | replicationServer.shutdown(); |
| | | dbCloseLock.readLock().unlock(); |
| | | } |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | replicationServer.shutdown(); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | replicationServer.shutdown(); |
| | | replicationServer.handleUnexpectedDatabaseException(e); |
| | | } |
| | | } |
| | | |
| | |
| | | return new ReplServerDBCursor(); |
| | | } |
| | | |
| | | |
| | | |
| | | private void closeLockedCursor(Cursor cursor) |
| | | throws DatabaseException |
| | | throws DatabaseException |
| | | { |
| | | try |
| | | { |
| | | if (cursor != null) |
| | | cursor.close(); |
| | | { |
| | | try |
| | | { |
| | | cursor.close(); |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | // Ignore. |
| | | } |
| | | } |
| | | } |
| | | finally |
| | | { |
| | |
| | | String str = null; |
| | | ChangeNumber cn = null; |
| | | |
| | | dbCloseLock.readLock().lock(); |
| | | try |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | |
| | | cursor = db.openCursor(null, null); |
| | | |
| | | OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); |
| | | |
| | | if (status != OperationStatus.SUCCESS) |
| | | dbCloseLock.readLock().lock(); |
| | | try |
| | | { |
| | | /* database is empty */ |
| | | return null; |
| | | } |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | |
| | | str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | { |
| | | // First record is a counter record .. go next |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | cursor = db.openCursor(null, null); |
| | | |
| | | OperationStatus status = cursor.getFirst(key, data, |
| | | LockMode.DEFAULT); |
| | | |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | // DB contains only a counter record |
| | | /* database is empty */ |
| | | return null; |
| | | } |
| | | else |
| | | |
| | | str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | { |
| | | cn = new ChangeNumber(decodeUTF8(key.getData())); |
| | | // First record is a counter record .. go next |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | // DB contains only a counter record |
| | | return null; |
| | | } |
| | | else |
| | | { |
| | | cn = new ChangeNumber(decodeUTF8(key.getData())); |
| | | } |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | closeLockedCursor(cursor); |
| | | } |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | /* database is faulty */ |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | replicationServer.shutdown(); |
| | | replicationServer.handleUnexpectedDatabaseException(e); |
| | | cn = null; |
| | | } |
| | | finally |
| | | { |
| | | closeLockedCursor(cursor); |
| | | } |
| | | return cn; |
| | | } |
| | | |
| | |
| | | Cursor cursor = null; |
| | | ChangeNumber cn = null; |
| | | |
| | | dbCloseLock.readLock().lock(); |
| | | try |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | |
| | | cursor = db.openCursor(null, null); |
| | | |
| | | OperationStatus status = cursor.getLast(key, data, |
| | | LockMode.DEFAULT); |
| | | |
| | | if (status != OperationStatus.SUCCESS) |
| | | dbCloseLock.readLock().lock(); |
| | | try |
| | | { |
| | | /* database is empty */ |
| | | return null; |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | |
| | | cursor = db.openCursor(null, null); |
| | | |
| | | OperationStatus status = cursor.getLast(key, data, |
| | | LockMode.DEFAULT); |
| | | |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | /* database is empty */ |
| | | return null; |
| | | } |
| | | |
| | | String str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | { |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) |
| | | != OperationStatus.SUCCESS) |
| | | { |
| | | /* |
| | | * database only contain a counter record - don't know how much it |
| | | * can be possible but ... |
| | | */ |
| | | cn = null; |
| | | } |
| | | else |
| | | { |
| | | str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | // There can't be 2 counter record next to each other |
| | | } |
| | | } |
| | | } |
| | | |
| | | String str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | finally |
| | | { |
| | | if (cursor.getPrev(key, data, |
| | | LockMode.DEFAULT) != OperationStatus.SUCCESS) |
| | | { |
| | | /* |
| | | * database only contain a counter record - don't know how much it can |
| | | * be possible but ... |
| | | */ |
| | | cn = null; |
| | | } |
| | | else |
| | | { |
| | | str = decodeUTF8(key.getData()); |
| | | cn= new ChangeNumber(str); |
| | | // There can't be 2 counter record next to each other |
| | | } |
| | | closeLockedCursor(cursor); |
| | | } |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | replicationServer.shutdown(); |
| | | replicationServer.handleUnexpectedDatabaseException(e); |
| | | cn = null; |
| | | } |
| | | finally |
| | | { |
| | | closeLockedCursor(cursor); |
| | | } |
| | | |
| | | return cn; |
| | | } |
| | |
| | | DatabaseEntry key = new ReplicationKey(changeNumber); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | |
| | | Transaction txn = null; |
| | | |
| | | dbCloseLock.readLock().lock(); |
| | | try |
| | | { |
| | | cursor = db.openCursor(txn, null); |
| | | if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) == |
| | | OperationStatus.SUCCESS) |
| | | dbCloseLock.readLock().lock(); |
| | | try |
| | | { |
| | | // We can move close to the changeNumber. |
| | | // Let's move to the previous change. |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) == |
| | | OperationStatus.SUCCESS) |
| | | cursor = db.openCursor(null, null); |
| | | if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) |
| | | == OperationStatus.SUCCESS) |
| | | { |
| | | String str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | // We can move close to the changeNumber. |
| | | // Let's move to the previous change. |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) |
| | | == OperationStatus.SUCCESS) |
| | | { |
| | | if (cursor.getPrev(key, data, |
| | | LockMode.DEFAULT) != OperationStatus.SUCCESS) |
| | | String str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | { |
| | | // database starts with a counter record. |
| | | cn = null; |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) |
| | | != OperationStatus.SUCCESS) |
| | | { |
| | | // database starts with a counter record. |
| | | cn = null; |
| | | } |
| | | else |
| | | { |
| | | str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | // There can't be 2 counter record next to each other |
| | | } |
| | | } |
| | | else |
| | | } |
| | | // else, there was no change previous to our changeNumber. |
| | | } |
| | | else |
| | | { |
| | | // We could not move the cursor past to the changeNumber |
| | | // Check if the last change is older than changeNumber |
| | | if (cursor.getLast(key, data, LockMode.DEFAULT) |
| | | == OperationStatus.SUCCESS) |
| | | { |
| | | String str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | { |
| | | str = decodeUTF8(key.getData()); |
| | | cn= new ChangeNumber(str); |
| | | // There can't be 2 counter record next to each other |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) |
| | | != OperationStatus.SUCCESS) |
| | | { |
| | | /* |
| | | * database only contain a counter record, should not be |
| | | * possible, but Ok, let's just say no change Number |
| | | */ |
| | | cn = null; |
| | | } |
| | | else |
| | | { |
| | | str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | // There can't be 2 counter record next to each other |
| | | } |
| | | } |
| | | } |
| | | } |
| | | // else, there was no change previous to our changeNumber. |
| | | } |
| | | else |
| | | finally |
| | | { |
| | | // We could not move the cursor past to the changeNumber |
| | | // Check if the last change is older than changeNumber |
| | | if (cursor.getLast(key, data, LockMode.DEFAULT) == |
| | | OperationStatus.SUCCESS) |
| | | { |
| | | String str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | { |
| | | if (cursor.getPrev(key, data, |
| | | LockMode.DEFAULT) != OperationStatus.SUCCESS) |
| | | { |
| | | /* |
| | | * database only contain a counter record, should not be |
| | | * possible, but Ok, let's just say no change Number |
| | | */ |
| | | cn = null; |
| | | } |
| | | else |
| | | { |
| | | str = decodeUTF8(key.getData()); |
| | | cn= new ChangeNumber(str); |
| | | // There can't be 2 counter record next to each other |
| | | } |
| | | } |
| | | } |
| | | closeLockedCursor(cursor); |
| | | } |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | /* database is faulty */ |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | // TODO: Verify if shutting down replication is the right thing to do |
| | | replicationServer.shutdown(); |
| | | replicationServer.handleUnexpectedDatabaseException(e); |
| | | cn = null; |
| | | } |
| | | finally |
| | | { |
| | | closeLockedCursor(cursor); |
| | | } |
| | | return cn; |
| | | } |
| | | |
| | |
| | | catch (Exception e) |
| | | { |
| | | // Unlocking is required before throwing any exception |
| | | try |
| | | { |
| | | closeLockedCursor(localCursor); |
| | | } |
| | | catch (Exception ignore) |
| | | { |
| | | // Ignore. |
| | | } |
| | | closeLockedCursor(localCursor); |
| | | throw e; |
| | | } |
| | | } |
| | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | try |
| | | { |
| | | closeLockedCursor(localCursor); |
| | | } |
| | | catch (Exception ignore) |
| | | { |
| | | // Ignore. |
| | | } |
| | | closeLockedCursor(localCursor); |
| | | |
| | | if (localTxn != null) |
| | | { |
| | |
| | | isClosed = true; |
| | | } |
| | | |
| | | boolean closeHasFailed = false; |
| | | |
| | | try |
| | | { |
| | | closeLockedCursor(cursor); |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | closeHasFailed = true; |
| | | } |
| | | |
| | | closeLockedCursor(cursor); |
| | | if (txn != null) |
| | | { |
| | | try |
| | | { |
| | | txn.commit(); |
| | | // No need for durability when purging. |
| | | txn.commit(Durability.COMMIT_NO_SYNC); |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | closeHasFailed = true; |
| | | replicationServer.handleUnexpectedDatabaseException(e); |
| | | } |
| | | } |
| | | |
| | | if (closeHasFailed) |
| | | { |
| | | replicationServer.shutdown(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | isClosed = true; |
| | | } |
| | | |
| | | boolean closeHasFailed = false; |
| | | |
| | | try |
| | | { |
| | | closeLockedCursor(cursor); |
| | | } |
| | | catch (LockConflictException e) |
| | | { |
| | | // The DB documentation states that a DeadlockException |
| | | // on the close method of a cursor that is aborting should |
| | | // be ignored. |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | closeHasFailed = true; |
| | | } |
| | | closeLockedCursor(cursor); |
| | | |
| | | if (txn != null) |
| | | { |
| | |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | closeHasFailed = true; |
| | | replicationServer.handleUnexpectedDatabaseException(e); |
| | | } |
| | | } |
| | | |
| | | if (closeHasFailed) |
| | | { |
| | | replicationServer.shutdown(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | int distToCounterRecord1 = 0; |
| | | int distBackToCounterRecord2 = 0; |
| | | int count=0; |
| | | Cursor cursor = null; |
| | | Transaction txn = null; |
| | | OperationStatus status; |
| | | try |
| | | { |
| | | ChangeNumber cn ; |
| | | |
| | | if ((start==null)&&(stop==null)) |
| | | return (int)db.count(); |
| | | |
| | | // Step 1 : from the start point, traverse db to the next counter record |
| | | // or to the stop point. |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | cursor = db.openCursor(txn, null); |
| | | if (start != null) |
| | | Cursor cursor = null; |
| | | try |
| | | { |
| | | key = new ReplicationKey(start); |
| | | status = cursor.getSearchKey(key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT); |
| | | } |
| | | else |
| | | { |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | ChangeNumber cn ; |
| | | |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | // test whether the record is a regular change or a counter |
| | | String csnString = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(csnString); |
| | | if (cn.getServerId() != 0) |
| | | if ((start==null)&&(stop==null)) |
| | | return (int)db.count(); |
| | | |
| | | // Step 1 : from the start point, traverse db to the next counter record |
| | | // or to the stop point. |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | cursor = db.openCursor(null, null); |
| | | if (start != null) |
| | | { |
| | | // reached a regular change record |
| | | // test whether we reached the 'stop' target |
| | | if (!cn.newer(stop)) |
| | | { |
| | | // let's loop |
| | | distToCounterRecord1++; |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | else |
| | | { |
| | | // reached the end |
| | | break; |
| | | } |
| | | key = new ReplicationKey(start); |
| | | status = cursor.getSearchKey(key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT); |
| | | } |
| | | else |
| | | { |
| | | // counter record |
| | | counterRecord1 = decodeCounterValue(data.getData()); |
| | | break; |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | } |
| | | cursor.close(); |
| | | |
| | | // cases |
| | | // |
| | | if (counterRecord1==0) |
| | | return distToCounterRecord1; |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | // test whether the record is a regular change or a counter |
| | | String csnString = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(csnString); |
| | | if (cn.getServerId() != 0) |
| | | { |
| | | // reached a regular change record |
| | | // test whether we reached the 'stop' target |
| | | if (!cn.newer(stop)) |
| | | { |
| | | // let's loop |
| | | distToCounterRecord1++; |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | else |
| | | { |
| | | // reached the end |
| | | break; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // counter record |
| | | counterRecord1 = decodeCounterValue(data.getData()); |
| | | break; |
| | | } |
| | | } |
| | | cursor.close(); |
| | | |
| | | // Step 2 : from the stop point, traverse db to the next counter record |
| | | // or to the start point. |
| | | txn = null; |
| | | data = new DatabaseEntry(); |
| | | key = new ReplicationKey(stop); |
| | | cursor = db.openCursor(txn, null); |
| | | status = cursor.getSearchKey(key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.SUCCESS) |
| | | { |
| | | cn = new ChangeNumber(decodeUTF8(key.getData())); |
| | | } |
| | | else |
| | | { |
| | | key = new DatabaseEntry(); |
| | | // cases |
| | | // |
| | | if (counterRecord1==0) |
| | | return distToCounterRecord1; |
| | | |
| | | // Step 2 : from the stop point, traverse db to the next counter record |
| | | // or to the start point. |
| | | data = new DatabaseEntry(); |
| | | status = cursor.getLast(key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | key = new ReplicationKey(stop); |
| | | cursor = db.openCursor(null, null); |
| | | status = cursor.getSearchKey(key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.SUCCESS) |
| | | { |
| | | /* database is empty */ |
| | | return 0; |
| | | cn = new ChangeNumber(decodeUTF8(key.getData())); |
| | | } |
| | | } |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | cn = new ChangeNumber(decodeUTF8(key.getData())); |
| | | if (!ReplicationDB.isaCounter(cn)) |
| | | else |
| | | { |
| | | // regular change record |
| | | if (!cn.older(start)) |
| | | key = new DatabaseEntry(); |
| | | data = new DatabaseEntry(); |
| | | status = cursor.getLast(key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | distBackToCounterRecord2++; |
| | | status = cursor.getPrev(key, data, LockMode.DEFAULT); |
| | | /* database is empty */ |
| | | return 0; |
| | | } |
| | | } |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | cn = new ChangeNumber(decodeUTF8(key.getData())); |
| | | if (!ReplicationDB.isaCounter(cn)) |
| | | { |
| | | // regular change record |
| | | if (!cn.older(start)) |
| | | { |
| | | distBackToCounterRecord2++; |
| | | status = cursor.getPrev(key, data, LockMode.DEFAULT); |
| | | } |
| | | else |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | // counter record |
| | | counterRecord2 = decodeCounterValue(data.getData()); |
| | | break; |
| | | } |
| | | } |
| | | else |
| | | cursor.close(); |
| | | |
| | | // Step 3 : Now consolidates the result |
| | | if (counterRecord1!=0) |
| | | { |
| | | // counter record |
| | | counterRecord2 = decodeCounterValue(data.getData()); |
| | | break; |
| | | if (counterRecord1 == counterRecord2) |
| | | { |
| | | // only one cp between from and to - no need to use it |
| | | count = distToCounterRecord1 + distBackToCounterRecord2; |
| | | } |
| | | else |
| | | { |
| | | // 2 cp between from and to |
| | | count = distToCounterRecord1 + (counterRecord2-counterRecord1) |
| | | + distBackToCounterRecord2; |
| | | } |
| | | } |
| | | } |
| | | cursor.close(); |
| | | |
| | | // Step 3 : Now consolidates the result |
| | | if (counterRecord1!=0) |
| | | finally |
| | | { |
| | | if (counterRecord1 == counterRecord2) |
| | | if (cursor != null) |
| | | { |
| | | // only one cp between from and to - no need to use it |
| | | count = distToCounterRecord1 + distBackToCounterRecord2; |
| | | } |
| | | else |
| | | { |
| | | // 2 cp between from and to |
| | | count = distToCounterRecord1 + (counterRecord2-counterRecord1) |
| | | + distBackToCounterRecord2; |
| | | cursor.close(); |
| | | } |
| | | } |
| | | } |
| | | finally |
| | | catch (DatabaseException e) |
| | | { |
| | | if (cursor != null) |
| | | cursor.close(); |
| | | if (txn != null) |
| | | { |
| | | try |
| | | { |
| | | txn.abort(); |
| | | } catch (DatabaseException e1) |
| | | { |
| | | // can't do much more. The ReplicationServer is shutting down. |
| | | } |
| | | } |
| | | replicationServer.handleUnexpectedDatabaseException(e); |
| | | } |
| | | return count; |
| | | } |