| | |
| | | synchronized (msgQueue) |
| | | { |
| | | int size = msgQueue.size(); |
| | | if ((size > queueHimark) || (queueByteSize > queueHimarkBytes)) |
| | | if (size > queueHimark || queueByteSize > queueHimarkBytes) |
| | | { |
| | | msgQueue.notify(); |
| | | } |
| | | |
| | | while ((size > queueMaxSize) || (queueByteSize > queueMaxBytes)) |
| | | while (size > queueMaxSize || queueByteSize > queueMaxBytes) |
| | | { |
| | | try |
| | | { |
| | |
| | | */ |
| | | private List<UpdateMsg> getChanges(int number) |
| | | { |
| | | int current = 0; |
| | | LinkedList<UpdateMsg> changes = new LinkedList<UpdateMsg>(); |
| | | |
| | | synchronized (msgQueue) |
| | | { |
| | | int size = msgQueue.size(); |
| | | while ((current < number) && (current < size)) |
| | | { |
| | | UpdateMsg msg = msgQueue.get(current); |
| | | current++; |
| | | changes.add(msg); |
| | | } |
| | | final int minAvailableNb = Math.min(number, msgQueue.size()); |
| | | return new LinkedList<UpdateMsg>(msgQueue.subList(0, minAvailableNb)); |
| | | } |
| | | return changes; |
| | | } |
| | | |
| | | /** |
| | |
| | | synchronized (msgQueue) |
| | | { |
| | | int current = 0; |
| | | while ((current < number) && (!msgQueue.isEmpty())) |
| | | while (current < number && !msgQueue.isEmpty()) |
| | | { |
| | | UpdateMsg msg = msgQueue.remove(); // remove first |
| | | queueByteSize -= msg.size(); |
| | | current++; |
| | | } |
| | | if ((msgQueue.size() < queueLowmark) |
| | | && (queueByteSize < queueLowmarkBytes)) |
| | | if (msgQueue.size() < queueLowmark |
| | | && queueByteSize < queueLowmarkBytes) |
| | | { |
| | | msgQueue.notifyAll(); |
| | | } |
| | |
| | | { |
| | | try |
| | | { |
| | | this.wait(); |
| | | wait(); |
| | | } catch (Exception e) |
| | | { /* do nothing */} |
| | | } |
| | |
| | | |
| | | synchronized (msgQueue) |
| | | { |
| | | if ((msgQueue.size() < queueLowmark) && |
| | | (queueByteSize < queueLowmarkBytes)) |
| | | if (msgQueue.size() < queueLowmark |
| | | && queueByteSize < queueLowmarkBytes) |
| | | { |
| | | try |
| | | { |
| | |
| | | done = true; |
| | | } |
| | | if (replicationServer != null) |
| | | { |
| | | replicationServer.shutdown(); |
| | | } |
| | | break; |
| | | } |
| | | } |
| | |
| | | synchronized (this) |
| | | { |
| | | done = true; |
| | | this.notifyAll(); |
| | | notifyAll(); |
| | | } |
| | | } |
| | | |
| | |
| | | return; |
| | | } |
| | | |
| | | if ((!changeNumber.equals(lastChange)) |
| | | && (changeNumber.older(trimDate))) |
| | | if (!changeNumber.equals(lastChange) |
| | | && changeNumber.older(trimDate)) |
| | | { |
| | | cursor.delete(); |
| | | } |
| | |
| | | public void flush() |
| | | { |
| | | int size; |
| | | int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize); |
| | | int chunksize = Math.min(queueMaxSize, 500); |
| | | |
| | | do |
| | | { |
| | |
| | | List<UpdateMsg> changes = getChanges(chunksize); |
| | | |
| | | // if no more changes to save exit immediately. |
| | | if ((changes == null) || ((size = changes.size()) == 0)) |
| | | if (changes == null || (size = changes.size()) == 0) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | // save the change to the stable storage. |
| | | db.addEntries(changes); |
| | |
| | | // (remove from the beginning of the queue) |
| | | clearQueue(changes.size()); |
| | | } |
| | | } while (size >= chunksize); |
| | | // loop while there are more changes in the queue |
| | | } while (size == chunksize); |
| | | } |
| | | |
| | | /** |
| | |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public ArrayList<Attribute> getMonitorData() |
| | | public List<Attribute> getMonitorData() |
| | | { |
| | | ArrayList<Attribute> attributes = new ArrayList<Attribute>(); |
| | | List<Attribute> attributes = new ArrayList<Attribute>(); |
| | | attributes.add(Attributes.create("replicationServer-database", |
| | | String.valueOf(serverId))); |
| | | attributes.add(Attributes.create("domain-name", baseDn)); |
| | | if (firstChange != null) |
| | | { |
| | | Date firstTime = new Date(firstChange.getTime()); |
| | | attributes.add(Attributes.create("first-change", firstChange |
| | | .toString() |
| | | + " " + firstTime.toString())); |
| | | attributes.add(Attributes.create("first-change", encode(firstChange))); |
| | | } |
| | | if (lastChange != null) |
| | | { |
| | | Date lastTime = new Date(lastChange.getTime()); |
| | | attributes.add(Attributes.create("last-change", lastChange |
| | | .toString() |
| | | + " " + lastTime.toString())); |
| | | attributes.add(Attributes.create("last-change", encode(lastChange))); |
| | | } |
| | | attributes.add( |
| | | Attributes.create("queue-size", String.valueOf(msgQueue.size()))); |
| | | attributes.add( |
| | | Attributes.create("queue-size-bytes", String.valueOf(queueByteSize))); |
| | | |
| | | return attributes; |
| | | } |
| | | |
| | | private String encode(ChangeNumber changeNumber) |
| | | { |
| | | return changeNumber + " " + new Date(changeNumber.getTime()); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return(baseDn + " " + serverId + " " + firstChange + " " + lastChange); |
| | | return baseDn + " " + serverId + " " + firstChange + " " + lastChange; |
| | | } |
| | | |
| | | /** |
| | |
| | | * changes from the DB. |
| | | * @throws Exception When an exception occurs while accessing a resource |
| | | * from the DB. |
| | | * |
| | | */ |
| | | public void clear() throws DatabaseException, Exception |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Getter fot the serverID of the server for which this database is managed. |
| | | * Getter for the serverID of the server for which this database is managed. |
| | | * |
| | | * @return the serverId. |
| | | */ |
| | |
| | | // Now that we always keep the last ChangeNumber in the DB to avoid |
| | | // expiring cookies too quickly, we need to check if the "to" |
| | | // is older than the trim date. |
| | | if ((to == null) || !to.older(new ChangeNumber(latestTrimDate, 0, 0))) |
| | | if (to == null || !to.older(new ChangeNumber(latestTrimDate, 0, 0))) |
| | | { |
| | | flush(); |
| | | return db.count(from, to); |