| | |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.InitializationException; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | |
| | | // Changes are not read back by replicationServer threads that are responsible |
| | | // for pushing the changes to other replication server or to LDAP server |
| | | // |
| | | private final LinkedList<UpdateMessage> msgQueue = |
| | | new LinkedList<UpdateMessage>(); |
| | | private final LinkedList<UpdateMsg> msgQueue = |
| | | new LinkedList<UpdateMsg>(); |
| | | |
| | | // The High and low water mark for the max size of the msgQueue. |
| | | // the threads calling add() method will be blocked if the size of |
| | |
| | | firstChange = db.readFirstChange(); |
| | | lastChange = db.readLastChange(); |
| | | thread = new DirectoryThread(this, |
| | | "Replication Server db " + id + " " + baseDn); |
| | | "Replication Server db for DS " + id + " and " + baseDn + " in RS " + |
| | | replicationServer.getServerId()); |
| | | thread.start(); |
| | | |
| | | DirectoryServer.deregisterMonitorProvider( |
| | |
| | | * @param update The update that must be saved to the db managed by this db |
| | | * handler. |
| | | */ |
| | | public void add(UpdateMessage update) |
| | | public void add(UpdateMsg update) |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | |
| | | * @param number the number of messages to extract. |
| | | * @return a List containing number changes extracted from the queue. |
| | | */ |
| | | private List<UpdateMessage> getChanges(int number) |
| | | private List<UpdateMsg> getChanges(int number) |
| | | { |
| | | int current = 0; |
| | | LinkedList<UpdateMessage> changes = new LinkedList<UpdateMessage>(); |
| | | LinkedList<UpdateMsg> changes = new LinkedList<UpdateMsg>(); |
| | | |
| | | synchronized (msgQueue) |
| | | { |
| | | int size = msgQueue.size(); |
| | | while ((current < number) && (current < size)) |
| | | { |
| | | UpdateMessage msg = msgQueue.get(current); |
| | | UpdateMsg msg = msgQueue.get(current); |
| | | current++; |
| | | changes.add(msg); |
| | | } |
| | |
| | | { |
| | | try |
| | | { |
| | | UpdateMessage msg = msgQueue.getFirst(); |
| | | UpdateMsg msg = msgQueue.getFirst(); |
| | | recentChangeNumber = msg.getChangeNumber(); |
| | | } |
| | | catch (NoSuchElementException e) |
| | |
| | | int current = 0; |
| | | while ((current < number) && (!msgQueue.isEmpty())) |
| | | { |
| | | UpdateMessage msg = msgQueue.remove(); |
| | | UpdateMsg msg = msgQueue.remove(); |
| | | queueByteSize -= msg.size(); |
| | | current++; |
| | | } |
| | |
| | | synchronized(flushLock) |
| | | { |
| | | // get N messages to save in the DB |
| | | List<UpdateMessage> changes = getChanges(chunksize); |
| | | List<UpdateMsg> changes = getChanges(chunksize); |
| | | |
| | | // if no more changes to save exit immediately. |
| | | if ((changes == null) || ((size = changes.size()) == 0)) |
| | |
| | | public ArrayList<Attribute> getMonitorData() |
| | | { |
| | | ArrayList<Attribute> attributes = new ArrayList<Attribute>(); |
| | | attributes.add(new Attribute("replicationServer-database", |
| | | String.valueOf(serverId))); |
| | | attributes.add(new Attribute("base-dn", baseDn.toString())); |
| | | attributes.add(Attributes.create("replicationServer-database", |
| | | String.valueOf(serverId))); |
| | | attributes.add(Attributes.create("base-dn", baseDn.toString())); |
| | | if (firstChange != null) |
| | | { |
| | | Date firstTime = new Date(firstChange.getTime()); |
| | | attributes.add(new Attribute("first-change", |
| | | firstChange.toString() + " " + firstTime.toString())); |
| | | attributes.add(Attributes.create("first-change", firstChange |
| | | .toString() |
| | | + " " + firstTime.toString())); |
| | | } |
| | | if (lastChange != null) |
| | | { |
| | | Date lastTime = new Date(lastChange.getTime()); |
| | | attributes.add(new Attribute("last-change", |
| | | lastChange.toString() + " " + lastTime.toString())); |
| | | attributes.add(Attributes.create("last-change", lastChange |
| | | .toString() |
| | | + " " + lastTime.toString())); |
| | | } |
| | | attributes.add( |
| | | new Attribute("queue-size", String.valueOf(msgQueue.size()))); |
| | | Attributes.create("queue-size", String.valueOf(msgQueue.size()))); |
| | | attributes.add( |
| | | new Attribute("queue-size-bytes", String.valueOf(queueByteSize))); |
| | | Attributes.create("queue-size-bytes", String.valueOf(queueByteSize))); |
| | | |
| | | return attributes; |
| | | } |