| | |
| | | private int queueByteSize = 0; |
| | | |
| | | private ReplicationDB db; |
| | | private CSN firstChange; |
| | | private CSN lastChange; |
| | | private CSN oldestCSN; |
| | | private CSN newestCSN; |
| | | private int serverId; |
| | | private DN baseDN; |
| | | private DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | |
| | | queueLowmarkBytes = 200 * queueLowmark; |
| | | queueHimarkBytes = 200 * queueLowmark; |
| | | db = new ReplicationDB(id, baseDN, replicationServer, dbenv); |
| | | firstChange = db.readFirstChange(); |
| | | lastChange = db.readLastChange(); |
| | | oldestCSN = db.readOldestCSN(); |
| | | newestCSN = db.readNewestCSN(); |
| | | thread = new DirectoryThread(this, "Replication server RS(" |
| | | + replicationServer.getServerId() |
| | | + ") changelog checkpointer for Replica DS(" + id |
| | |
| | | |
| | | queueByteSize += update.size(); |
| | | msgQueue.add(update); |
| | | if (lastChange == null || lastChange.older(update.getCSN())) |
| | | if (newestCSN == null || newestCSN.older(update.getCSN())) |
| | | { |
| | | lastChange = update.getCSN(); |
| | | newestCSN = update.getCSN(); |
| | | } |
| | | if (firstChange == null) |
| | | if (oldestCSN == null) |
| | | { |
| | | firstChange = update.getCSN(); |
| | | oldestCSN = update.getCSN(); |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the firstChange. |
| | | * @return Returns the firstChange. |
| | | * Get the oldest CSN that has not been purged yet. |
| | | * |
| | | * @return the oldest CSN that has not been purged yet. |
| | | */ |
| | | public CSN getFirstChange() |
| | | public CSN getOldestCSN() |
| | | { |
| | | return firstChange; |
| | | return oldestCSN; |
| | | } |
| | | |
| | | /** |
| | | * Get the lastChange. |
| | | * @return Returns the lastChange. |
| | | * Get the newest CSN that has not been purged yet. |
| | | * |
| | | * @return the newest CSN that has not been purged yet. |
| | | */ |
| | | public CSN getLastChange() |
| | | public CSN getNewestCSN() |
| | | { |
| | | return lastChange; |
| | | return newestCSN; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public long getChangesCount() |
| | | { |
| | | if (lastChange != null && firstChange != null) |
| | | if (newestCSN != null && oldestCSN != null) |
| | | { |
| | | return lastChange.getSeqnum() - firstChange.getSeqnum() + 1; |
| | | return newestCSN.getSeqnum() - oldestCSN.getSeqnum() + 1; |
| | | } |
| | | return 0; |
| | | } |
| | |
| | | return; |
| | | } |
| | | |
| | | if (!csn.equals(lastChange) && csn.older(trimDate)) |
| | | if (!csn.equals(newestCSN) && csn.older(trimDate)) |
| | | { |
| | | cursor.delete(); |
| | | } |
| | | else |
| | | { |
| | | firstChange = csn; |
| | | oldestCSN = csn; |
| | | return; |
| | | } |
| | | } |
| | |
| | | String.valueOf(serverId))); |
| | | attributes.add(Attributes.create("domain-name", |
| | | baseDN.toNormalizedString())); |
| | | if (firstChange != null) |
| | | if (oldestCSN != null) |
| | | { |
| | | attributes.add(Attributes.create("first-change", encode(firstChange))); |
| | | attributes.add(Attributes.create("first-change", encode(oldestCSN))); |
| | | } |
| | | if (lastChange != null) |
| | | if (newestCSN != null) |
| | | { |
| | | attributes.add(Attributes.create("last-change", encode(lastChange))); |
| | | attributes.add(Attributes.create("last-change", encode(newestCSN))); |
| | | } |
| | | attributes.add( |
| | | Attributes.create("queue-size", String.valueOf(msgQueue.size()))); |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return baseDN + " " + serverId + " " + firstChange + " " + lastChange; |
| | | return baseDN + " " + serverId + " " + oldestCSN + " " + newestCSN; |
| | | } |
| | | |
| | | /** |
| | |
| | | queueByteSize = 0; |
| | | |
| | | db.clear(); |
| | | firstChange = db.readFirstChange(); |
| | | lastChange = db.readLastChange(); |
| | | oldestCSN = db.readOldestCSN(); |
| | | newestCSN = db.readNewestCSN(); |
| | | } |
| | | } |
| | | |