| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | import org.opends.messages.MessageBuilder; |
| | |
| | | // the threads calling add() method will be blocked if the size of |
| | | // msgQueue becomes larger than the queueHimark and will resume |
| | | // only when the size of the msgQueue goes below queueLowmark. |
| | | int queueHimark = 5000; |
| | | int queueLowmark = 4000; |
| | | int queueMaxSize = 5000; |
| | | int queueLowmark = 1000; |
| | | int queueHimark = 4000; |
| | | |
| | | // The queue himark and lowmark in bytes, this is set to 100 times the |
| | | // himark and lowmark in number of updates. |
| | | int queueHimarkBytes = 100 * queueHimark; |
| | | int queueMaxBytes = 100 * queueMaxSize; |
| | | int queueLowmarkBytes = 100 * queueLowmark; |
| | | int queueHimarkBytes = 100 * queueHimark; |
| | | |
| | | // The number of bytes currently in the queue |
| | | int queueByteSize = 0; |
| | |
| | | serverId = id; |
| | | this.baseDn = baseDn; |
| | | trimage = replicationServer.getTrimage(); |
| | | queueHimark = queueSize; |
| | | queueLowmark = queueSize * 4 / 5; |
| | | queueHimarkBytes = 100 * queueHimark; |
| | | queueLowmarkBytes = 100 * queueLowmark; |
| | | queueMaxSize = queueSize; |
| | | queueLowmark = queueSize * 1 / 5; |
| | | queueHimark = queueSize * 4 / 5; |
| | | queueMaxBytes = 200 * queueMaxSize; |
| | | queueLowmarkBytes = 200 * queueLowmark; |
| | | queueHimarkBytes = 200 * queueLowmark; |
| | | db = new ReplicationDB(id, baseDn, replicationServer, dbenv); |
| | | firstChange = db.readFirstChange(); |
| | | lastChange = db.readLastChange(); |
| | |
| | | synchronized (msgQueue) |
| | | { |
| | | int size = msgQueue.size(); |
| | | while ((size > queueHimark) || (queueByteSize > queueHimarkBytes)) |
| | | if ((size > queueHimark) || (queueByteSize > queueHimarkBytes)) |
| | | msgQueue.notify(); |
| | | |
| | | while ((size > queueMaxSize) || (queueByteSize > queueMaxBytes)) |
| | | { |
| | | try |
| | | { |
| | | msgQueue.wait(500); |
| | | msgQueue.wait(5000); |
| | | } catch (InterruptedException e) |
| | | { |
| | | // simply loop to try again. |
| | |
| | | { |
| | | while (shutdown == false) |
| | | { |
| | | try { |
| | | try |
| | | { |
| | | flush(); |
| | | trim(); |
| | | |
| | | synchronized (this) |
| | | synchronized (msgQueue) |
| | | { |
| | | try |
| | | if ((msgQueue.size() < queueLowmark) && |
| | | (queueByteSize < queueLowmarkBytes)) |
| | | { |
| | | this.wait(1000); |
| | | } catch (InterruptedException e) |
| | | { } |
| | | try |
| | | { |
| | | msgQueue.wait(10000); |
| | | } catch (InterruptedException e) |
| | | { } |
| | | } |
| | | } |
| | | } catch (Exception end) |
| | | { |
| | |
| | | int tries = 0; |
| | | while ((tries++ < DEADLOCK_RETRIES) && (!done)) |
| | | { |
| | | /* the trim is done by group in order to save some CPU and IO bandwidth |
| | | * start the transaction then do a bunch of remove then commit |
| | | */ |
| | | ReplServerDBCursor cursor; |
| | | cursor = db.openDeleteCursor(); |
| | | |
| | | try |
| | | synchronized (flushLock) |
| | | { |
| | | while ((size < 5000 ) && (!finished)) |
| | | /* the trim is done by group in order to save some CPU and IO bandwidth |
| | | * start the transaction then do a bunch of remove then commit |
| | | */ |
| | | ReplServerDBCursor cursor; |
| | | cursor = db.openDeleteCursor(); |
| | | |
| | | try |
| | | { |
| | | ChangeNumber changeNumber = cursor.nextChangeNumber(); |
| | | if (changeNumber != null) |
| | | while ((size < 5000 ) && (!finished)) |
| | | { |
| | | if ((!changeNumber.equals(lastChange)) |
| | | && (changeNumber.older(trimDate))) |
| | | ChangeNumber changeNumber = cursor.nextChangeNumber(); |
| | | if (changeNumber != null) |
| | | { |
| | | size++; |
| | | cursor.delete(); |
| | | if ((!changeNumber.equals(lastChange)) |
| | | && (changeNumber.older(trimDate))) |
| | | { |
| | | size++; |
| | | cursor.delete(); |
| | | } |
| | | else |
| | | { |
| | | firstChange = changeNumber; |
| | | finished = true; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | firstChange = changeNumber; |
| | | finished = true; |
| | | } |
| | | } |
| | | else |
| | | finished = true; |
| | | cursor.close(); |
| | | done = true; |
| | | } |
| | | cursor.close(); |
| | | done = true; |
| | | } |
| | | catch (DeadlockException e) |
| | | { |
| | | cursor.abort(); |
| | | if (tries == DEADLOCK_RETRIES) |
| | | catch (DeadlockException e) |
| | | { |
| | | // could not handle the Deadlock after DEADLOCK_RETRIES tries. |
| | | // shutdown the ReplicationServer. |
| | | cursor.abort(); |
| | | if (tries == DEADLOCK_RETRIES) |
| | | { |
| | | // could not handle the Deadlock after DEADLOCK_RETRIES tries. |
| | | // shutdown the ReplicationServer. |
| | | shutdown = true; |
| | | throw (e); |
| | | } |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | // mark shutdown for this db so that we don't try again to |
| | | // stop it from cursor.close() or methods called by cursor.close() |
| | | shutdown = true; |
| | | cursor.abort(); |
| | | throw (e); |
| | | } |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | // mark shutdown for this db so that we don't try again to |
| | | // stop it from cursor.close() or methods called by cursor.close() |
| | | shutdown = true; |
| | | cursor.abort(); |
| | | throw (e); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | private void flush() |
| | | { |
| | | int size; |
| | | int chunksize = (500 < queueHimark ? 500 : queueHimark); |
| | | int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize); |
| | | |
| | | do |
| | | { |
| | |
| | | { |
| | | msgQueue.clear(); |
| | | queueByteSize = 0; |
| | | |
| | | db.clear(); |
| | | firstChange = db.readFirstChange(); |
| | | lastChange = db.readLastChange(); |
| | | } |
| | | db.clear(); |
| | | firstChange = db.readFirstChange(); |
| | | lastChange = db.readLastChange(); |
| | | } |
| | | } |