| | |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.types.Attributes.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | |
| | | class MessageHandler extends MonitorProvider<MonitorProviderCfg> |
| | | { |
| | | /** The logger of this class. */ |
| | | protected static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | private static final int MINIMUM_TRESHOLD_MSG_QUEUE_SIZE = 5; |
| | | |
| | | /** |
| | | * UpdateMsg queue. |
| | | */ |
| | | /** UpdateMsg queue. */ |
| | | private final MsgQueue msgQueue = new MsgQueue(); |
| | | /** |
| | | * Late queue. All access to the lateQueue in getNextMessage() is |
| | |
| | | * need protecting against removals performed using getNextMessage(). |
| | | */ |
| | | private final MsgQueue lateQueue = new MsgQueue(); |
| | | /** |
| | | * Local hosting RS. |
| | | */ |
| | | protected ReplicationServer replicationServer; |
| | | /** |
| | | * Specifies the related replication server domain based on baseDN. |
| | | */ |
| | | /** Local hosting RS. */ |
| | | protected final ReplicationServer replicationServer; |
| | | /** Specifies the related replication server domain based on baseDN. */ |
| | | protected ReplicationServerDomain replicationServerDomain; |
| | | /** |
| | | * Number of update sent to the server. |
| | | */ |
| | | /** Number of update sent to the server. */ |
| | | private int outCount; |
| | | /** |
| | | * Number of updates received from the server. |
| | | */ |
| | | /** Number of updates received from the server. */ |
| | | private int inCount; |
| | | /** |
| | | * Specifies the max queue size for this handler. |
| | | */ |
| | | protected int maxQueueSize = 5000; |
| | | /** |
| | | * Specifies the max queue size in bytes for this handler. |
| | | */ |
| | | private int maxQueueBytesSize = maxQueueSize * 100; |
| | | /** |
| | | * Specifies whether the consumer is following the producer (is not late). |
| | | */ |
| | | /** Specifies the max queue size for this handler. */ |
| | | protected final int maxQueueSize; |
| | | /** Specifies the max queue size in bytes for this handler. */ |
| | | private final int maxQueueBytesSize; |
| | | /** Specifies whether the consumer is following the producer (is not late). */ |
| | | private boolean following; |
| | | /** |
| | | * Specifies the current serverState of this handler. |
| | | */ |
| | | /** Specifies the current serverState of this handler. */ |
| | | private ServerState serverState; |
| | | /** |
| | | * Specifies the baseDN of the domain. |
| | | */ |
| | | /** Specifies the baseDN of the domain. */ |
| | | private DN baseDN; |
| | | /** |
| | | * Specifies whether the consumer is still active or not. |
| | |
| | | * Called at the beginning of shutdown process. |
| | | */ |
| | | private boolean activeConsumer = true; |
| | | /** |
| | | * Set when ServerHandler is stopping. |
| | | */ |
| | | private AtomicBoolean shuttingDown = new AtomicBoolean(false); |
| | | /** Set when ServerHandler is stopping. */ |
| | | private final AtomicBoolean shuttingDown = new AtomicBoolean(false); |
| | | |
| | | /** |
| | | * Creates a new server handler instance with the provided socket. |
| | |
| | | |
| | | msgQueue.add(update); |
| | | |
| | | /* TODO : size should be configurable |
| | | * and larger than max-receive-queue-size |
| | | */ |
| | | // TODO : size should be configurable and larger than max-receive-queue-size |
| | | while (isMsgQueueAboveThreshold()) |
| | | { |
| | | following = false; |
| | |
| | | @Override |
| | | public List<Attribute> getMonitorData() |
| | | { |
| | | List<Attribute> attributes = new ArrayList<Attribute>(); |
| | | List<Attribute> attributes = new ArrayList<>(); |
| | | attributes.add(create("handler", getMonitorInstanceName())); |
| | | attributes.add(create("queue-size", String.valueOf(msgQueue.count()))); |
| | | attributes.add(create("queue-size-bytes", String.valueOf(msgQueue.bytesCount()))); |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | synchronized (msgQueue) |
| | | { |
| | | if (following) |
| | |
| | | */ |
| | | private void fillLateQueue(Set<Integer> connectedReplicaIds) |
| | | { |
| | | DBCursor<UpdateMsg> cursor = null; |
| | | try |
| | | try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);) |
| | | { |
| | | cursor = replicationServerDomain.getCursorFrom(serverState); |
| | | while (cursor.next() && isLateQueueBelowThreshold()) |
| | | { |
| | | final UpdateMsg record = cursor.getRecord(); |
| | |
| | | { |
| | | logger.traceException(e); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | private boolean isLateQueueBelowThreshold() |
| | |
| | | |
| | | private CSN findOldestCSNFromReplicaDBs() |
| | | { |
| | | DBCursor<UpdateMsg> cursor = null; |
| | | try |
| | | try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);) |
| | | { |
| | | cursor = replicationServerDomain.getCursorFrom(serverState); |
| | | while (cursor.next()) |
| | | { |
| | | final UpdateMsg record = cursor.getRecord(); |
| | |
| | | { |
| | | return null; |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | return baseDN; |
| | | } |
| | | |
| | | /** |
| | | * Increase the counter of updates received from the server. |
| | | */ |
| | | /** Increase the counter of updates received from the server. */ |
| | | void incrementInCount() |
| | | { |
| | | inCount++; |
| | | } |
| | | |
| | | /** |
| | | * Increase the counter of updates sent to the server. |
| | | */ |
| | | /** Increase the counter of updates sent to the server. */ |
| | | void incrementOutCount() |
| | | { |
| | | outCount++; |
| | |
| | | this.activeConsumer = active; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Set the initial value of the serverState for this handler. |
| | | * Expected to be done once, then the state will be updated using |
| | |
| | | this.serverState = serverState; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Set the baseDN for this handler. Expected to be done once and never changed |
| | | * during the handler life. |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this handler. |
| | | */ |
| | | /** Shutdown this handler. */ |
| | | public void shutdown() |
| | | { |
| | | synchronized (msgQueue) |
| | |
| | | return this.replicationServer.getServerURL(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |