| | |
| | | private short serverId; |
| | | private ProtocolSession session; |
| | | private ServerHandler handler; |
| | | private ReplicationCache replicationCache; |
| | | private ReplicationServerDomain replicationServerDomain; |
| | | |
| | | /** |
| | | * Constructor for the LDAP server reader part of the replicationServer. |
| | |
| | | * @param session The ProtocolSession from which to read the data. |
| | | * @param serverId The server ID of the server from which we read messages. |
| | | * @param handler The server handler for this server reader. |
| | | * @param replicationCache The ReplicationCache for this server reader. |
| | | * @param replicationServerDomain The ReplicationServerDomain for this server |
| | | * reader. |
| | | */ |
| | | public ServerReader(ProtocolSession session, short serverId, |
| | | ServerHandler handler, ReplicationCache replicationCache) |
| | | ServerHandler handler, |
| | | ReplicationServerDomain replicationServerDomain) |
| | | { |
| | | super(handler.toString() + " reader"); |
| | | this.session = session; |
| | | this.serverId = serverId; |
| | | this.handler = handler; |
| | | this.replicationCache = replicationCache; |
| | | this.replicationServerDomain = replicationServerDomain; |
| | | } |
| | | |
| | | /** |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationCache.getReplicationServer(). |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer()?" RS ":" LS")+ |
| | | " reader starting for serverId=" + serverId); |
| | | } |
| | | /* |
| | | * wait on input stream |
| | | * grab all incoming messages and publish them to the replicationCache |
| | | * grab all incoming messages and publish them to the |
| | | * replicationServerDomain |
| | | */ |
| | | try |
| | | { |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationCache.getReplicationServer(). |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer()?" From RS ":" From LS")+ |
| | | " with serverId=" + serverId + " receives " + msg); |
| | |
| | | { |
| | | AckMessage ack = (AckMessage) msg; |
| | | handler.checkWindow(); |
| | | replicationCache.ack(ack, serverId); |
| | | replicationServerDomain.ack(ack, serverId); |
| | | } |
| | | else if (msg instanceof UpdateMessage) |
| | | { |
| | | // Ignore update received from a replica with |
| | | // a bad generation ID |
| | | long referenceGenerationId = replicationCache.getGenerationId(); |
| | | long referenceGenerationId = |
| | | replicationServerDomain.getGenerationId(); |
| | | if ((referenceGenerationId>0) && |
| | | (referenceGenerationId != handler.getGenerationId())) |
| | | { |
| | |
| | | { |
| | | UpdateMessage update = (UpdateMessage) msg; |
| | | handler.decAndCheckWindow(); |
| | | replicationCache.put(update, handler); |
| | | replicationServerDomain.put(update, handler); |
| | | } |
| | | } |
| | | else if (msg instanceof WindowMessage) |
| | |
| | | else if (msg instanceof ResetGenerationId) |
| | | { |
| | | ResetGenerationId genIdMsg = (ResetGenerationId) msg; |
| | | replicationCache.resetGenerationId(this.handler, genIdMsg); |
| | | replicationServerDomain.resetGenerationId(this.handler, genIdMsg); |
| | | } |
| | | else if (msg instanceof WindowProbe) |
| | | { |
| | |
| | | { |
| | | if (handler.isReplicationServer()) |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationCache.getReplicationServer(). |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getServerId() + |
| | | " Receiving replServerInfo from " + handler.getServerId() + |
| | | " baseDn=" + replicationCache.getBaseDn() + |
| | | " baseDn=" + replicationServerDomain.getBaseDn() + |
| | | " genId=" + infoMsg.getGenerationId()); |
| | | } |
| | | |
| | | if (replicationCache.getGenerationId()<0) |
| | | if (replicationServerDomain.getGenerationId()<0) |
| | | { |
| | | // Here is the case where a ReplicationServer receives from |
| | | // another ReplicationServer the generationId for a domain |
| | | // for which the generation ID has never been set. |
| | | replicationCache.setGenerationId(infoMsg.getGenerationId(), false); |
| | | replicationServerDomain. |
| | | setGenerationId(infoMsg.getGenerationId(),false); |
| | | } |
| | | else |
| | | { |
| | |
| | | // If we have generationId set locally and no server currently |
| | | // connected for that domain in the topology then we may also |
| | | // reset the generationId localy. |
| | | replicationCache.mayResetGenerationId(); |
| | | replicationServerDomain.mayResetGenerationId(); |
| | | } |
| | | |
| | | if (replicationCache.getGenerationId() != infoMsg.getGenerationId()) |
| | | if (replicationServerDomain.getGenerationId() != |
| | | infoMsg.getGenerationId()) |
| | | { |
| | | Message message = NOTE_BAD_GENERATION_ID.get( |
| | | replicationCache.getBaseDn().toNormalizedString(), |
| | | replicationServerDomain.getBaseDn().toNormalizedString(), |
| | | Short.toString(handler.getServerId()), |
| | | Long.toString(infoMsg.getGenerationId()), |
| | | Long.toString(replicationCache.getGenerationId())); |
| | | Long.toString(replicationServerDomain.getGenerationId())); |
| | | |
| | | ErrorMessage errorMsg = new ErrorMessage( |
| | | replicationCache.getReplicationServer().getServerId(), |
| | | replicationServerDomain.getReplicationServer().getServerId(), |
| | | handler.getServerId(), |
| | | message); |
| | | session.publish(errorMsg); |
| | |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationCache.getReplicationServer(). |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " reader IO EXCEPTION for serverID=" + serverId |
| | | + stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage()); |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | "In RS <" + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " reader CNF EXCEPTION serverID=" + serverId |
| | | + stackTraceToSingleLineString(e)); |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | "In RS <" + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " server reader EXCEPTION serverID=" + serverId |
| | | + stackTraceToSingleLineString(e)); |
| | |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationCache.getReplicationServer(). |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " server reader for serverID=" + serverId + |
| | | " is closing the session"); |
| | |
| | | { |
| | | // ignore |
| | | } |
| | | replicationCache.stopServer(handler); |
| | | replicationServerDomain.stopServer(handler); |
| | | } |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationCache.getReplicationServer(). |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer()?" RS":" LDAP") + |
| | | " server reader stopped for serverID=" + serverId); |