| | |
| | | * LocalizableMessage are buffered into a queue. |
| | | * Consumers are expected to come and consume the UpdateMsg from the queue. |
| | | */ |
| | | public class MessageHandler extends MonitorProvider<MonitorProviderCfg> |
| | | class MessageHandler extends MonitorProvider<MonitorProviderCfg> |
| | | { |
| | | /** The logger of this class. */ |
| | | protected static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | |
| | | /** |
| | | * Number of update sent to the server. |
| | | */ |
| | | protected int outCount = 0; |
| | | private int outCount = 0; |
| | | /** |
| | | * Number of updates received from the server. |
| | | */ |
| | | protected int inCount = 0; |
| | | private int inCount = 0; |
| | | /** |
| | | * Specifies the max queue size for this handler. |
| | | */ |
| | |
| | | /** |
| | | * Specifies the max queue size in bytes for this handler. |
| | | */ |
| | | protected int maxQueueBytesSize = maxQueueSize * 100; |
| | | private int maxQueueBytesSize = maxQueueSize * 100; |
| | | /** |
| | | * Specifies whether the consumer is following the producer (is not late). |
| | | */ |
| | |
| | | * in memory by this ServerHandler. |
| | | * @param replicationServer The hosting replication server. |
| | | */ |
| | | public MessageHandler(int queueSize, ReplicationServer replicationServer) |
| | | MessageHandler(int queueSize, ReplicationServer replicationServer) |
| | | { |
| | | this.maxQueueSize = queueSize; |
| | | this.maxQueueBytesSize = queueSize * 100; |
| | |
| | | * @param update The update that must be added to the list of updates of |
| | | * this handler. |
| | | */ |
| | | public void add(UpdateMsg update) |
| | | void add(UpdateMsg update) |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | |
| | | * waiting for some changes, wake it up |
| | | */ |
| | | if (msgQueue.isEmpty()) |
| | | { |
| | | msgQueue.notify(); |
| | | } |
| | | |
| | | msgQueue.add(update); |
| | | |
| | |
| | | * Set the shut down flag to true and returns the previous value of the flag. |
| | | * @return The previous value of the shut down flag |
| | | */ |
| | | public boolean engageShutdown() |
| | | boolean engageShutdown() |
| | | { |
| | | return shuttingDown.getAndSet(true); |
| | | } |
| | |
| | | * Returns the shutdown flag. |
| | | * @return The shutdown flag value. |
| | | */ |
| | | public boolean shuttingDown() |
| | | boolean shuttingDown() |
| | | { |
| | | return shuttingDown.get(); |
| | | } |
| | |
| | | * |
| | | * @param waitConnections Waits for the Connections with other RS to |
| | | * be established before returning. |
| | | * @return The replication server domain. |
| | | */ |
| | | public ReplicationServerDomain getDomain(boolean waitConnections) |
| | | private void setDomain(boolean waitConnections) |
| | | { |
| | | if (replicationServerDomain == null) |
| | | { |
| | |
| | | replicationServer.waitConnections(); |
| | | } |
| | | } |
| | | return replicationServerDomain; |
| | | } |
| | | |
| | | /** |
| | | * Get the count of updates received from the server. |
| | | * @return the count of update received from the server. |
| | | */ |
| | | public int getInCount() |
| | | int getInCount() |
| | | { |
| | | return inCount; |
| | | } |
| | |
| | | while (msgQueue.isEmpty() && following) |
| | | { |
| | | if (!synchronous) |
| | | { |
| | | return null; |
| | | } |
| | | msgQueue.wait(500); |
| | | if (!activeConsumer) |
| | | { |
| | | return null; |
| | | } |
| | | } |
| | | } catch (InterruptedException e) |
| | | { |
| | |
| | | * Get the count of updates sent to this server. |
| | | * @return The count of update sent to this server. |
| | | */ |
| | | public int getOutCount() |
| | | int getOutCount() |
| | | { |
| | | return outCount; |
| | | } |
| | |
| | | /** |
| | | * Increase the counter of updates received from the server. |
| | | */ |
| | | public void incrementInCount() |
| | | void incrementInCount() |
| | | { |
| | | inCount++; |
| | | } |
| | |
| | | /** |
| | | * Increase the counter of updates sent to the server. |
| | | */ |
| | | public void incrementOutCount() |
| | | void incrementOutCount() |
| | | { |
| | | outCount++; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void initializeMonitorProvider(MonitorProviderCfg configuration) |
| | | throws ConfigException, InitializationException |
| | |
| | | else |
| | | { |
| | | this.baseDN = baseDN; |
| | | if (!baseDN.toNormalizedString().equals("cn=changelog")) |
| | | { |
| | | getDomain(isDataServer); |
| | | } |
| | | setDomain(!"cn=changelog".equals(baseDN.toNormalizedString()) |
| | | && isDataServer); |
| | | } |
| | | } |
| | | |
| | |
| | | * @param msg the last update sent. |
| | | * @return boolean indicating if the update was meaningful. |
| | | */ |
| | | public boolean updateServerState(UpdateMsg msg) |
| | | boolean updateServerState(UpdateMsg msg) |
| | | { |
| | | return serverState.update(msg.getCSN()); |
| | | } |