| | |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import org.opends.server.admin.std.server.MonitorProviderCfg; |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.config.ConfigException; |
| | |
| | | private MsgQueue lateQueue = new MsgQueue(); |
| | | private ReplicationServerDomain replicationServerDomain = null; |
| | | private String serverURL; |
| | | private int outCount = 0; // number of update sent to the server |
| | | |
| | | private int inCount = 0; // number of updates received from the server |
| | | // Number of update sent to the server |
| | | private int outCount = 0; |
| | | // Number of updates received from the server |
| | | private int inCount = 0; |
| | | |
| | | // Number of updates received from the server in assured safe read mode |
| | | private int assuredSrReceivedUpdates = 0; |
| | | // Number of updates received from the server in assured safe read mode that |
| | | // timed out |
| | | private AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger(); |
| | | // Number of updates sent to the server in assured safe read mode |
| | | private int assuredSrSentUpdates = 0; |
| | | // Number of updates sent to the server in assured safe read mode that timed |
| | | // out |
| | | private AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger(); |
| | | // Number of updates received from the server in assured safe data mode |
| | | private int assuredSdReceivedUpdates = 0; |
| | | // Number of updates received from the server in assured safe data mode that |
| | | // timed out |
| | | private AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger(); |
| | | // Number of updates sent to the server in assured safe data mode |
| | | private int assuredSdSentUpdates = 0; |
| | | // Number of updates sent to the server in assured safe data mode that timed |
| | | // out |
| | | private AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger(); |
| | | |
| | | private int maxReceiveQueue = 0; |
| | | private int maxSendQueue = 0; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates received from the server in assured safe read |
| | | * mode. |
| | | * @return The number of updates received from the server in assured safe read |
| | | * mode |
| | | */ |
| | | public int getAssuredSrReceivedUpdates() |
| | | { |
| | | return assuredSrReceivedUpdates; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates received from the server in assured safe read |
| | | * mode that timed out. |
| | | * @return The number of updates received from the server in assured safe read |
| | | * mode that timed out. |
| | | */ |
| | | public AtomicInteger getAssuredSrReceivedUpdatesTimeout() |
| | | { |
| | | return assuredSrReceivedUpdatesTimeout; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates sent to the server in assured safe read mode. |
| | | * @return The number of updates sent to the server in assured safe read mode |
| | | */ |
| | | public int getAssuredSrSentUpdates() |
| | | { |
| | | return assuredSrSentUpdates; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates sent to the server in assured safe read mode that |
| | | * timed out. |
| | | * @return The number of updates sent to the server in assured safe read mode |
| | | * that timed out. |
| | | */ |
| | | public AtomicInteger getAssuredSrSentUpdatesTimeout() |
| | | { |
| | | return assuredSrSentUpdatesTimeout; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates received from the server in assured safe data |
| | | * mode. |
| | | * @return The number of updates received from the server in assured safe data |
| | | * mode |
| | | */ |
| | | public int getAssuredSdReceivedUpdates() |
| | | { |
| | | return assuredSdReceivedUpdates; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates received from the server in assured safe data |
| | | * mode that timed out. |
| | | * @return The number of updates received from the server in assured safe data |
| | | * mode that timed out. |
| | | */ |
| | | public AtomicInteger getAssuredSdReceivedUpdatesTimeout() |
| | | { |
| | | return assuredSdReceivedUpdatesTimeout; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates sent to the server in assured safe data mode. |
| | | * @return The number of updates sent to the server in assured safe data mode |
| | | */ |
| | | public int getAssuredSdSentUpdates() |
| | | { |
| | | return assuredSdSentUpdates; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates sent to the server in assured safe data mode that |
| | | * timed out. |
| | | * @return The number of updates sent to the server in assured safe data mode |
| | | * that timed out. |
| | | */ |
| | | public AtomicInteger getAssuredSdSentUpdatesTimeout() |
| | | { |
| | | return assuredSdSentUpdatesTimeout; |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates received from the server in assured safe |
| | | * read mode. |
| | | */ |
| | | public void incrementAssuredSrReceivedUpdates() |
| | | { |
| | | assuredSrReceivedUpdates++; |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates received from the server in assured safe |
| | | * read mode that timed out. |
| | | */ |
| | | public void incrementAssuredSrReceivedUpdatesTimeout() |
| | | { |
| | | assuredSrReceivedUpdatesTimeout.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates sent to the server in assured safe read |
| | | * mode. |
| | | */ |
| | | public void incrementAssuredSrSentUpdates() |
| | | { |
| | | assuredSrSentUpdates++; |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates sent to the server in assured safe read |
| | | * mode that timed out. |
| | | */ |
| | | public void incrementAssuredSrSentUpdatesTimeout() |
| | | { |
| | | assuredSrSentUpdatesTimeout.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates received from the server in assured safe |
| | | * data mode. |
| | | */ |
| | | public void incrementAssuredSdReceivedUpdates() |
| | | { |
| | | assuredSdReceivedUpdates++; |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates received from the server in assured safe |
| | | * data mode that timed out. |
| | | */ |
| | | public void incrementAssuredSdReceivedUpdatesTimeout() |
| | | { |
| | | assuredSdReceivedUpdatesTimeout.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates sent to the server in assured safe data |
| | | * mode. |
| | | */ |
| | | public void incrementAssuredSdSentUpdates() |
| | | { |
| | | assuredSdSentUpdates++; |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of updates sent to the server in assured safe data |
| | | * mode that timed out. |
| | | */ |
| | | public void incrementAssuredSdSentUpdatesTimeout() |
| | | { |
| | | assuredSdSentUpdatesTimeout.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | | * Check is this server is saturated (this server has already been |
| | | * sent a bunch of updates and has not processed them so they are staying |
| | | * in the message queue for this server an the size of the queue |
| | |
| | | // loop until not interrupted |
| | | } |
| | | } while (((interrupted) || (!acquired)) && (!shutdownWriter)); |
| | | this.incrementOutCount(); |
| | | if (msg != null) |
| | | { |
| | | incrementOutCount(); |
| | | if (msg.isAssured()) |
| | | { |
| | | if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE) |
| | | { |
| | | incrementAssuredSrSentUpdates(); |
| | | } else |
| | | { |
| | | if (!isLDAPserver()) |
| | | incrementAssuredSdSentUpdates(); |
| | | } |
| | | } |
| | | } |
| | | return msg; |
| | | } |
| | | |
| | |
| | | // Deprecated |
| | | attributes.add(Attributes.create("max-waiting-changes", String |
| | | .valueOf(maxQueueSize))); |
| | | attributes.add(Attributes.create("update-sent", String |
| | | attributes.add(Attributes.create("sent-updates", String |
| | | .valueOf(getOutCount()))); |
| | | attributes.add(Attributes.create("update-received", String |
| | | attributes.add(Attributes.create("received-updates", String |
| | | .valueOf(getInCount()))); |
| | | |
| | | // Assured counters |
| | | attributes.add(Attributes.create("assured-sr-received-updates", String |
| | | .valueOf(getAssuredSrReceivedUpdates()))); |
| | | attributes.add(Attributes.create("assured-sr-received-updates-timeout", |
| | | String .valueOf(getAssuredSrReceivedUpdatesTimeout()))); |
| | | attributes.add(Attributes.create("assured-sr-sent-updates", String |
| | | .valueOf(getAssuredSrSentUpdates()))); |
| | | attributes.add(Attributes.create("assured-sr-sent-updates-timeout", String |
| | | .valueOf(getAssuredSrSentUpdatesTimeout()))); |
| | | attributes.add(Attributes.create("assured-sd-received-updates", String |
| | | .valueOf(getAssuredSdReceivedUpdates()))); |
| | | if (!isLDAPserver()) |
| | | { |
| | | attributes.add(Attributes.create("assured-sd-sent-updates", |
| | | String.valueOf(getAssuredSdSentUpdates()))); |
| | | attributes.add(Attributes.create("assured-sd-sent-updates-timeout", |
| | | String.valueOf(getAssuredSdSentUpdatesTimeout()))); |
| | | } else |
| | | { |
| | | attributes.add(Attributes.create("assured-sd-received-updates-timeout", |
| | | String.valueOf(getAssuredSdReceivedUpdatesTimeout()))); |
| | | } |
| | | |
| | | // Window stats |
| | | attributes.add(Attributes.create("max-send-window", String |
| | | .valueOf(sendWindowSize))); |