| | |
| | | import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | import com.sleepycat.je.LockConflictException; |
| | | |
| | | /** |
| | | * This class is used for managing the replicationServer database for each |
| | |
| | | private final Object flushLock = new Object(); |
| | | private ReplicationServer replicationServer; |
| | | |
| | | // The maximum number of retries in case of DatabaseDeadlock Exception. |
| | | private static final int DEADLOCK_RETRIES = 10; |
| | | |
| | | private long latestTrimDate = 0; |
| | | |
| | | /** |
| | |
| | | * are older than this age are removed. |
| | | * |
| | | */ |
| | | private long trimage; |
| | | private long trimAge; |
| | | |
| | | /** |
| | | * Creates a new dbHandler associated to a given LDAP server. |
| | |
| | | this.replicationServer = replicationServer; |
| | | serverId = id; |
| | | this.baseDn = baseDn; |
| | | trimage = replicationServer.getTrimage(); |
| | | trimAge = replicationServer.getTrimAge(); |
| | | queueMaxSize = queueSize; |
| | | queueLowmark = queueSize * 1 / 5; |
| | | queueHimark = queueSize * 4 / 5; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Return the number of changes between 2 provided change numbers. |
| | | * @param from The lower (older) change number. |
| | | * @param to The upper (newer) change number. |
| | | * @return The computed number of changes. |
| | | */ |
| | | public int traverseAndCount(ChangeNumber from, ChangeNumber to) |
| | | { |
| | | int count = 0; |
| | | flush(); |
| | | ReplServerDBCursor cursor = null; |
| | | try |
| | | { |
| | | try |
| | | { |
| | | cursor = db.openReadCursor(from); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | return 0; |
| | | } |
| | | ChangeNumber curr = null; |
| | | while ((curr = cursor.nextChangeNumber())!=null) |
| | | { |
| | | if (curr.newerOrEquals(to)) |
| | | break; |
| | | count++; |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | if (cursor != null) |
| | | cursor.abort(); |
| | | } |
| | | return count; |
| | | } |
| | | |
| | | /** |
| | | * Removes the provided number of messages from the beginning of the msgQueue. |
| | | * |
| | | * @param number the number of changes to be removed. |
| | |
| | | */ |
| | | private void trim() throws DatabaseException, Exception |
| | | { |
| | | if (trimage == 0) |
| | | if (trimAge == 0) |
| | | { |
| | | return; |
| | | int size = 0; |
| | | boolean finished = false; |
| | | boolean done = false; |
| | | } |
| | | |
| | | latestTrimDate = TimeThread.getTime() - trimage; |
| | | latestTrimDate = TimeThread.getTime() - trimAge; |
| | | |
| | | ChangeNumber trimDate = new ChangeNumber(latestTrimDate, 0, 0); |
| | | |
| | | // Find the last changeNumber before the trimDate, in the Database. |
| | | ChangeNumber lastBeforeTrimDate = db.getPreviousChangeNumber(trimDate); |
| | | ChangeNumber lastBeforeTrimDate = db |
| | | .getPreviousChangeNumber(trimDate); |
| | | if (lastBeforeTrimDate != null) |
| | | { |
| | | // If we found it, we want to stop trimming when reaching it. |
| | | trimDate = lastBeforeTrimDate; |
| | | } |
| | | // In case of deadlock detection by the Database, this thread can |
| | | // by aborted by a DeadlockException. This is a transient error and |
| | | // the transaction should be attempted again. |
| | | // We will try DEADLOCK_RETRIES times before failing. |
| | | int tries = 0; |
| | | while ((tries++ < DEADLOCK_RETRIES) && (!done)) |
| | | |
| | | for (int i = 0; i < 100; i++) |
| | | { |
| | | synchronized (flushLock) |
| | | { |
| | | /* the trim is done by group in order to save some CPU and IO bandwidth |
| | | /* |
| | | * the trim is done by group in order to save some CPU and IO bandwidth |
| | | * start the transaction then do a bunch of remove then commit |
| | | */ |
| | | ReplServerDBCursor cursor = db.openDeleteCursor(); |
| | | |
| | | final ReplServerDBCursor cursor = db.openDeleteCursor(); |
| | | try |
| | | { |
| | | while ((size < 5000 ) && (!finished)) |
| | | for (int j = 0; j < 50; j++) |
| | | { |
| | | ChangeNumber changeNumber = cursor.nextChangeNumber(); |
| | | if (changeNumber != null) |
| | | if (changeNumber == null) |
| | | { |
| | | if ((!changeNumber.equals(lastChange)) |
| | | && (changeNumber.older(trimDate))) |
| | | { |
| | | size++; |
| | | cursor.delete(); |
| | | } |
| | | else |
| | | { |
| | | firstChange = changeNumber; |
| | | finished = true; |
| | | } |
| | | cursor.close(); |
| | | done = true; |
| | | return; |
| | | } |
| | | |
| | | if ((!changeNumber.equals(lastChange)) |
| | | && (changeNumber.older(trimDate))) |
| | | { |
| | | cursor.delete(); |
| | | } |
| | | else |
| | | finished = true; |
| | | { |
| | | firstChange = changeNumber; |
| | | cursor.close(); |
| | | done = true; |
| | | return; |
| | | } |
| | | } |
| | | cursor.close(); |
| | | done = true; |
| | | } |
| | | catch (LockConflictException e) |
| | | { |
| | | cursor.abort(); |
| | | if (tries == DEADLOCK_RETRIES) |
| | | { |
| | | // could not handle the Deadlock after DEADLOCK_RETRIES tries. |
| | | // shutdown the ReplicationServer. |
| | | shutdown = true; |
| | | throw (e); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // mark shutdown for this db so that we don't try again to |
| | | // stop it from cursor.close() or methods called by cursor.close() |
| | | shutdown = true; |
| | | cursor.abort(); |
| | | throw (e); |
| | | shutdown = true; |
| | | throw e; |
| | | } |
| | | } |
| | | } |
| | |
| | | */ |
| | | public void setPurgeDelay(long delay) |
| | | { |
| | | trimage = delay; |
| | | trimAge = delay; |
| | | } |
| | | |
| | | /** |