| | |
| | | |
| | | private ProtocolSession session; |
| | | private ServerHandler handler; |
| | | private ReplicationCache replicationCache; |
| | | private ReplicationServerDomain replicationServerDomain; |
| | | private short serverId; |
| | | |
| | | /** |
| | |
| | | * @param session the ProtocolSession that will be used to send updates. |
| | | * @param serverId the Identifier of the server. |
| | | * @param handler handler for which the ServerWriter is created. |
| | | * @param replicationCache The ReplicationCache of this ServerWriter. |
| | | * @param replicationServerDomain The ReplicationServerDomain of this |
| | | * ServerWriter. |
| | | */ |
| | | public ServerWriter(ProtocolSession session, short serverId, |
| | | ServerHandler handler, ReplicationCache replicationCache) |
| | | ServerHandler handler, |
| | | ReplicationServerDomain replicationServerDomain) |
| | | { |
| | | super(handler.toString() + " writer"); |
| | | |
| | | this.serverId = serverId; |
| | | this.session = session; |
| | | this.handler = handler; |
| | | this.replicationCache = replicationCache; |
| | | this.replicationServerDomain = replicationServerDomain; |
| | | } |
| | | |
| | | /** |
| | | * Run method for the ServerWriter. |
| | | * Loops waiting for changes from the ReplicationCache and forward them |
| | | * Loops waiting for changes from the ReplicationServerDomain and forward them |
| | | * to the other servers |
| | | */ |
| | | public void run() |
| | |
| | | { |
| | | while (true) |
| | | { |
| | | UpdateMessage update = replicationCache.take(this.handler); |
| | | UpdateMessage update = replicationServerDomain.take(this.handler); |
| | | if (update == null) |
| | | return; /* this connection is closing */ |
| | | |
| | | // Ignore update to be sent to a replica with a bad generation ID |
| | | long referenceGenerationId = replicationCache.getGenerationId(); |
| | | long referenceGenerationId = replicationServerDomain.getGenerationId(); |
| | | if ((referenceGenerationId != handler.getGenerationId()) |
| | | || (referenceGenerationId == -1) |
| | | || (handler.getGenerationId() == -1)) |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In " + replicationCache.getReplicationServer(). |
| | | "In " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | ", writer to " + this.handler.getMonitorInstanceName() + |
| | | " publishes msg=" + update.toString() + |
| | |
| | | { |
| | | // Can't do much more : ignore |
| | | } |
| | | replicationCache.stopServer(handler); |
| | | replicationServerDomain.stopServer(handler); |
| | | |
| | | if (debugEnabled()) |
| | | { |