| | |
| | | private int serverId; |
| | | private ProtocolSession session; |
| | | private ServerHandler handler; |
| | | private ReplicationServerDomain replicationServerDomain; |
| | | |
| | | /** |
| | | * Constructor for the LDAP server reader part of the replicationServer. |
| | |
| | | * @param session The ProtocolSession from which to read the data. |
| | | * @param serverId The server ID of the server from which we read messages. |
| | | * @param handler The server handler for this server reader. |
| | | * @param replicationServerDomain The ReplicationServerDomain for this server |
| | | * reader. |
| | | */ |
| | | public ServerReader(ProtocolSession session, int serverId, |
| | | ServerHandler handler, |
| | | ReplicationServerDomain replicationServerDomain) |
| | | ServerHandler handler) |
| | | { |
| | | super("Replication Reader Thread for handler of " + |
| | | handler.toString() + |
| | | " in " + replicationServerDomain); |
| | | super("Replication Reader Thread for RS handler " + |
| | | handler.getMonitorInstanceName()); |
| | | this.session = session; |
| | | this.serverId = serverId; |
| | | this.handler = handler; |
| | | this.replicationServerDomain = replicationServerDomain; |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + replicationServerDomain + " " + |
| | | getName() + " receives " + msg); |
| | | TRACER.debugInfo("In " + getName() + " receives " + msg); |
| | | } |
| | | |
| | | if (msg instanceof AckMsg) |
| | | { |
| | | AckMsg ack = (AckMsg) msg; |
| | | handler.checkWindow(); |
| | | replicationServerDomain.processAck(ack, handler); |
| | | handler.processAck(ack); |
| | | } else if (msg instanceof UpdateMsg) |
| | | { |
| | | boolean filtered = false; |
| | |
| | | if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) || |
| | | (dsStatus == ServerStatus.FULL_UPDATE_STATUS)) |
| | | { |
| | | long referenceGenerationId = |
| | | replicationServerDomain.getGenerationId(); |
| | | long referenceGenerationId = handler.getReferenceGenId(); |
| | | if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) |
| | | logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get( |
| | | Integer.toString(replicationServerDomain. |
| | | getReplicationServer().getServerId()), |
| | | replicationServerDomain.getBaseDn(), |
| | | Integer.toString(handler.getReplicationServerId()), |
| | | handler.getServiceId(), |
| | | ((UpdateMsg) msg).getChangeNumber().toString(), |
| | | Integer.toString(handler.getServerId()), |
| | | Long.toString(referenceGenerationId), |
| | | Long.toString(handler.getGenerationId()))); |
| | | if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | | logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get( |
| | | Integer.toString(replicationServerDomain. |
| | | getReplicationServer().getServerId()), |
| | | replicationServerDomain.getBaseDn(), |
| | | Integer.toString(handler.getReplicationServerId()), |
| | | handler.getServiceId(), |
| | | ((UpdateMsg) msg).getChangeNumber().toString(), |
| | | Integer.toString(handler.getServerId()))); |
| | | filtered = true; |
| | |
| | | * Ignore updates from RS with bad gen id |
| | | * (no system managed status for a RS) |
| | | */ |
| | | long referenceGenerationId = |
| | | replicationServerDomain.getGenerationId(); |
| | | long referenceGenerationId =handler.getReferenceGenId(); |
| | | if ((referenceGenerationId > 0) && |
| | | (referenceGenerationId != handler.getGenerationId())) |
| | | { |
| | | logError( |
| | | ERR_IGNORING_UPDATE_FROM_RS.get( |
| | | Integer.toString( |
| | | replicationServerDomain.getReplicationServer(). |
| | | getServerId()), |
| | | replicationServerDomain.getBaseDn(), |
| | | handler.getReplicationServerId()), |
| | | handler.getServiceId(), |
| | | ((UpdateMsg) msg).getChangeNumber().toString(), |
| | | Integer.toString(handler.getServerId()), |
| | | Long.toString(referenceGenerationId), |
| | |
| | | { |
| | | UpdateMsg update = (UpdateMsg) msg; |
| | | handler.decAndCheckWindow(); |
| | | replicationServerDomain.put(update, handler); |
| | | handler.put(update); |
| | | } |
| | | } else if (msg instanceof WindowMsg) |
| | | { |
| | |
| | | } else if (msg instanceof ResetGenerationIdMsg) |
| | | { |
| | | ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg; |
| | | replicationServerDomain.resetGenerationId(handler, genIdMsg); |
| | | handler.processResetGenId(genIdMsg); |
| | | } else if (msg instanceof WindowProbeMsg) |
| | | { |
| | | WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg; |
| | |
| | | try |
| | | { |
| | | ReplicationServerHandler rsh = (ReplicationServerHandler)handler; |
| | | replicationServerDomain.receiveTopoInfoFromRS(topoMsg, |
| | | rsh, true); |
| | | rsh.receiveTopoInfoFromRS(topoMsg); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | |
| | | try |
| | | { |
| | | DataServerHandler dsh = (DataServerHandler)handler; |
| | | replicationServerDomain.processNewStatus(dsh, csMsg); |
| | | dsh.receiveNewStatus(csMsg); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | errMessage = |
| | | ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get( |
| | | replicationServerDomain.getBaseDn(), |
| | | handler.getServiceId(), |
| | | Integer.toString(handler.getServerId()), |
| | | csMsg.toString()); |
| | | logError(errMessage); |
| | |
| | | } else if (msg instanceof ChangeTimeHeartbeatMsg) |
| | | { |
| | | ChangeTimeHeartbeatMsg cthbMsg = (ChangeTimeHeartbeatMsg) msg; |
| | | replicationServerDomain.processChangeTimeHeartbeatMsg(handler, |
| | | cthbMsg); |
| | | handler.process(cthbMsg); |
| | | } else if (msg instanceof StopMsg) |
| | | { |
| | | // Peer server is properly disconnecting: go out of here to |
| | |
| | | { |
| | | TRACER.debugInfo(handler.toString() + " has properly " + |
| | | "disconnected from this replication server " + |
| | | Integer.toString(replicationServerDomain.getReplicationServer(). |
| | | getServerId())); |
| | | Integer.toString(handler.getReplicationServerId())); |
| | | } |
| | | return; |
| | | } else if (msg == null) |
| | |
| | | // we just trash the message and log the event for debug purpose, |
| | | // then continue receiving messages. |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServerDomain. |
| | | getReplicationServer(). |
| | | getMonitorInstanceName() + ":" + e.getMessage()); |
| | | TRACER.debugInfo( |
| | | "In " + this.getName() + " " + stackTraceToSingleLineString(e)); |
| | | } |
| | | } |
| | | } |
| | |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " reader IO EXCEPTION for serverID=" + serverId + " " + |
| | | this + " " + |
| | | stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage()); |
| | | "In " + this.getName() + " " + stackTraceToSingleLineString(e)); |
| | | errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(), |
| | | Integer.toString(replicationServerDomain. |
| | | getReplicationServer().getServerId())); |
| | | Integer.toString(handler.getReplicationServerId())); |
| | | logError(errMessage); |
| | | } |
| | | catch (ClassNotFoundException e) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " reader CNF EXCEPTION serverID=" + serverId + |
| | | stackTraceToSingleLineString(e)); |
| | | "In " + this.getName() + " " + stackTraceToSingleLineString(e)); |
| | | /* |
| | | * The remote server has sent an unknown message, |
| | | * close the connection. |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " server reader EXCEPTION serverID=" + serverId + |
| | | " " + stackTraceToSingleLineString(e)); |
| | | "In " + this.getName() + " " + stackTraceToSingleLineString(e)); |
| | | /* |
| | | * The remote server has sent an unknown message, |
| | | * close the connection. |
| | |
| | | */ |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | this + " is closing the session"); |
| | | if (handler.getProtocolVersion() >= |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | |
| | | // Anyway, going to close session, so nothing to do |
| | | } |
| | | } |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + this.getName() + " closing the session"); |
| | | session.close(); |
| | | } catch (IOException e) |
| | | { |
| | | // ignore |
| | | } |
| | | replicationServerDomain.stopServer(handler); |
| | | handler.doStop(); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(this.getName() + " stopped " + errMessage); |