| | |
| | | import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | import com.sleepycat.je.DeadlockException; |
| | | |
| | | /** |
| | | * This class is used for managing the replicationServer database for each |
| | |
| | | final static int MSG_QUEUE_HIMARK = 5000; |
| | | final static int MSG_QUEUE_LOWMARK = 4000; |
| | | |
| | | // The maximum number of retries in case of DatabaseDeadlock Exception. |
| | | private static final int DEADLOCK_RETRIES = 10; |
| | | |
| | | /** |
| | | * |
| | | * The trim age in milliseconds. Changes record in the change DB that |
| | |
| | | } |
| | | } |
| | | |
| | | return new ReplicationIterator(serverId, db, changeNumber); |
| | | ReplicationIterator it = |
| | | new ReplicationIterator(serverId, db, changeNumber); |
| | | |
| | | return it; |
| | | } |
| | | |
| | | /** |
| | |
| | | return; |
| | | int size = 0; |
| | | boolean finished = false; |
| | | boolean done = false; |
| | | ChangeNumber trimDate = new ChangeNumber(TimeThread.getTime() - trimage, |
| | | (short) 0, (short)0); |
| | | |
| | | /* the trim is done by group in order to save some CPU and IO bandwidth |
| | | * start the transaction then do a bunch of remove then commit |
| | | */ |
| | | ReplServerDBCursor cursor; |
| | | // In case of deadlock detection by the Database, this thread can |
| | | // by aborted by a DeadlockException. This is a transient error and |
| | | // the transaction should be attempted again. |
| | | // We will try DEADLOCK_RETRIES times before failing. |
| | | int tries = 0; |
| | | while ((tries++ < DEADLOCK_RETRIES) && (!done)) |
| | | { |
| | | /* the trim is done by group in order to save some CPU and IO bandwidth |
| | | * start the transaction then do a bunch of remove then commit |
| | | */ |
| | | ReplServerDBCursor cursor; |
| | | cursor = db.openDeleteCursor(); |
| | | |
| | | cursor = db.openDeleteCursor(); |
| | | |
| | | try { |
| | | while ((size < 5000 ) && (!finished)) |
| | | try |
| | | { |
| | | ChangeNumber changeNumber = cursor.nextChangeNumber(); |
| | | if (changeNumber != null) |
| | | while ((size < 5000 ) && (!finished)) |
| | | { |
| | | if ((!changeNumber.equals(lastChange)) |
| | | && (changeNumber.older(trimDate))) |
| | | ChangeNumber changeNumber = cursor.nextChangeNumber(); |
| | | if (changeNumber != null) |
| | | { |
| | | size++; |
| | | cursor.delete(); |
| | | if ((!changeNumber.equals(lastChange)) |
| | | && (changeNumber.older(trimDate))) |
| | | { |
| | | size++; |
| | | cursor.delete(); |
| | | } |
| | | else |
| | | { |
| | | firstChange = changeNumber; |
| | | finished = true; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | firstChange = changeNumber; |
| | | finished = true; |
| | | } |
| | | } |
| | | else |
| | | finished = true; |
| | | cursor.close(); |
| | | done = true; |
| | | } |
| | | |
| | | cursor.close(); |
| | | } catch (DatabaseException e) |
| | | { |
| | | // mark shutdown for this db so that we don't try again to |
| | | // stop it from cursor.close() or methods called by cursor.close() |
| | | shutdown = true; |
| | | cursor.close(); |
| | | throw (e); |
| | | catch (DeadlockException e) |
| | | { |
| | | cursor.abort(); |
| | | if (tries == DEADLOCK_RETRIES) |
| | | { |
| | | // could not handle the Deadlock after DEADLOCK_RETRIES tries. |
| | | // shutdown the ReplicationServer. |
| | | shutdown = true; |
| | | throw (e); |
| | | } |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | // mark shutdown for this db so that we don't try again to |
| | | // stop it from cursor.close() or methods called by cursor.close() |
| | | shutdown = true; |
| | | cursor.abort(); |
| | | throw (e); |
| | | } |
| | | } |
| | | } |
| | | |