| | |
| | | // |
| | | private final LinkedList<UpdateMessage> msgQueue = |
| | | new LinkedList<UpdateMessage>(); |
| | | |
| | | // The High and low water mark for the max size of the msgQueue. |
| | | // the threads calling add() method will be blocked if the size of |
| | | // msgQueue becomes larger than the queueHimark and will resume |
| | | // only when the size of the msgQueue goes below queueLowmark. |
| | | int queueHimark = 5000; |
| | | int queueLowmark = 4000; |
| | | |
| | | // The queue himark and lowmark in bytes, this is set to 100 times the |
| | | // himark and lowmark in number of updates. |
| | | int queueHimarkBytes = 100 * queueHimark; |
| | | int queueLowmarkBytes = 100 * queueLowmark; |
| | | |
| | | // The number of bytes currently in the queue |
| | | int queueByteSize = 0; |
| | | |
| | | private ReplicationDB db; |
| | | private ChangeNumber firstChange = null; |
| | | private ChangeNumber lastChange = null; |
| | |
| | | private final Object flushLock = new Object(); |
| | | private ReplicationServer replicationServer; |
| | | |
| | | |
| | | // The High and low water mark for the max size of the msgQueue. |
| | | // the threads calling add() method will be blocked if the size of |
| | | // msgQueue becomes larger than the MSG_QUEUE_HIMARK and will resume |
| | | // only when the size of the msgQueue goes below MSG_QUEUE_LOWMARK. |
| | | final static int MSG_QUEUE_HIMARK = 5000; |
| | | final static int MSG_QUEUE_LOWMARK = 4000; |
| | | |
| | | // The maximum number of retries in case of DatabaseDeadlock Exception. |
| | | private static final int DEADLOCK_RETRIES = 10; |
| | | |
| | |
| | | * @param replicationServer The ReplicationServer that creates this dbHandler. |
| | | * @param dbenv the Database Env to use to create the ReplicationServer DB. |
| | | * server for this domain. |
| | | * @param queueSize The queueSize to use when creating the dbHandler. |
| | | * @throws DatabaseException If a database problem happened |
| | | */ |
| | | public DbHandler(short id, DN baseDn, ReplicationServer replicationServer, |
| | | ReplicationDbEnv dbenv) |
| | | public DbHandler( |
| | | short id, DN baseDn, ReplicationServer replicationServer, |
| | | ReplicationDbEnv dbenv, int queueSize) |
| | | throws DatabaseException |
| | | { |
| | | this.replicationServer = replicationServer; |
| | | this.serverId = id; |
| | | serverId = id; |
| | | this.baseDn = baseDn; |
| | | this.trimage = replicationServer.getTrimage(); |
| | | trimage = replicationServer.getTrimage(); |
| | | queueHimark = queueSize; |
| | | queueLowmark = queueSize * 4 / 5; |
| | | queueHimarkBytes = 100 * queueHimark; |
| | | queueLowmarkBytes = 100 * queueLowmark; |
| | | db = new ReplicationDB(id, baseDn, replicationServer, dbenv); |
| | | firstChange = db.readFirstChange(); |
| | | lastChange = db.readLastChange(); |
| | |
| | | synchronized (msgQueue) |
| | | { |
| | | int size = msgQueue.size(); |
| | | while (size > MSG_QUEUE_HIMARK) |
| | | while ((size > queueHimark) || (queueByteSize > queueHimarkBytes)) |
| | | { |
| | | try |
| | | { |
| | |
| | | size = msgQueue.size(); |
| | | } |
| | | |
| | | queueByteSize += update.size(); |
| | | msgQueue.add(update); |
| | | if (lastChange == null || lastChange.older(update.getChangeNumber())) |
| | | { |
| | |
| | | int current = 0; |
| | | while ((current < number) && (!msgQueue.isEmpty())) |
| | | { |
| | | msgQueue.remove(); |
| | | UpdateMessage msg = msgQueue.remove(); |
| | | queueByteSize -= msg.size(); |
| | | current++; |
| | | } |
| | | if (msgQueue.size() < MSG_QUEUE_LOWMARK) |
| | | if ((msgQueue.size() < queueLowmark) && |
| | | (queueByteSize < queueLowmarkBytes)) |
| | | msgQueue.notify(); |
| | | } |
| | | } |
| | |
| | | private void flush() |
| | | { |
| | | int size; |
| | | int chunksize = (500 < queueHimark ? 500 : queueHimark); |
| | | |
| | | do |
| | | { |
| | | synchronized(flushLock) |
| | | { |
| | | // get N messages to save in the DB |
| | | List<UpdateMessage> changes = getChanges(500); |
| | | List<UpdateMessage> changes = getChanges(chunksize); |
| | | |
| | | // if no more changes to save exit immediately. |
| | | if ((changes == null) || ((size = changes.size()) == 0)) |
| | |
| | | // remove the changes from the list of changes to be saved. |
| | | clearQueue(changes.size()); |
| | | } |
| | | } while (size >=500); |
| | | } while (size >= chunksize); |
| | | } |
| | | |
| | | /** |
| | |
| | | attributes.add(new Attribute("last-change", |
| | | lastChange.toString() + " " + lastTime.toString())); |
| | | } |
| | | attributes.add( |
| | | new Attribute("queue-size", String.valueOf(msgQueue.size()))); |
| | | attributes.add( |
| | | new Attribute("queue-size-bytes", String.valueOf(queueByteSize))); |
| | | |
| | | return attributes; |
| | | } |
| | |
| | | synchronized(flushLock) |
| | | { |
| | | msgQueue.clear(); |
| | | queueByteSize = 0; |
| | | } |
| | | db.clear(); |
| | | firstChange = db.readFirstChange(); |