| | |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import org.opends.messages.Message; |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | |
| | | import java.io.IOException; |
| | | |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.replication.protocol.AckMessage; |
| | | import org.opends.server.replication.protocol.DoneMessage; |
| | | import org.opends.server.replication.protocol.EntryMessage; |
| | | import org.opends.server.replication.protocol.ErrorMessage; |
| | | import org.opends.server.replication.protocol.ResetGenerationId; |
| | | import org.opends.server.replication.protocol.InitializeRequestMessage; |
| | | import org.opends.server.replication.protocol.InitializeTargetMessage; |
| | | import org.opends.server.replication.protocol.AckMsg; |
| | | import org.opends.server.replication.protocol.DoneMsg; |
| | | import org.opends.server.replication.protocol.EntryMsg; |
| | | import org.opends.server.replication.protocol.ErrorMsg; |
| | | import org.opends.server.replication.protocol.ResetGenerationIdMsg; |
| | | import org.opends.server.replication.protocol.InitializeRequestMsg; |
| | | import org.opends.server.replication.protocol.InitializeTargetMsg; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | | import org.opends.server.replication.protocol.ReplicationMessage; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.replication.protocol.WindowMessage; |
| | | import org.opends.server.replication.protocol.WindowProbe; |
| | | import org.opends.server.replication.protocol.ReplServerInfoMessage; |
| | | import org.opends.server.replication.protocol.MonitorMessage; |
| | | import org.opends.server.replication.protocol.MonitorRequestMessage; |
| | | import org.opends.server.replication.protocol.ReplicationMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.protocol.WindowMsg; |
| | | import org.opends.server.replication.protocol.WindowProbeMsg; |
| | | import org.opends.server.replication.protocol.TopologyMsg; |
| | | import org.opends.server.replication.protocol.MonitorMsg; |
| | | import org.opends.server.replication.protocol.MonitorRequestMsg; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.protocol.ChangeStatusMsg; |
| | | import org.opends.server.replication.protocol. |
| | | NotSupportedOldVersionPDUException; |
| | | |
| | | /** |
| | | * This class implement the part of the replicationServer that is reading |
| | |
| | | */ |
| | | public class ServerReader extends DirectoryThread |
| | | { |
| | | |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | private short serverId; |
| | | private ProtocolSession session; |
| | | private ServerHandler handler; |
| | |
| | | * reader. |
| | | */ |
| | | public ServerReader(ProtocolSession session, short serverId, |
| | | ServerHandler handler, |
| | | ReplicationServerDomain replicationServerDomain) |
| | | ServerHandler handler, |
| | | ReplicationServerDomain replicationServerDomain) |
| | | { |
| | | super(handler.toString() + " reader"); |
| | | super("Replication Reader for " + handler.toString() + " in RS " + |
| | | replicationServerDomain.getReplicationServer().getServerId()); |
| | | this.session = session; |
| | | this.serverId = serverId; |
| | | this.handler = handler; |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer()?" RS ":" LS")+ |
| | | " reader starting for serverId=" + serverId); |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer() ? " RS " : " LS") + |
| | | " reader starting for serverId=" + serverId); |
| | | } |
| | | /* |
| | | * wait on input stream |
| | |
| | | { |
| | | while (true) |
| | | { |
| | | ReplicationMessage msg = session.receive(); |
| | | ReplicationMsg msg = session.receive(); |
| | | |
| | | /* |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer()?" From RS ":" From LS")+ |
| | | " with serverId=" + serverId + " receives " + msg); |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer()?" From RS ":" From LS")+ |
| | | " with serverId=" + serverId + " receives " + msg); |
| | | } |
| | | */ |
| | | if (msg instanceof AckMessage) |
| | | */ |
| | | if (msg instanceof AckMsg) |
| | | { |
| | | AckMessage ack = (AckMessage) msg; |
| | | AckMsg ack = (AckMsg) msg; |
| | | handler.checkWindow(); |
| | | replicationServerDomain.ack(ack, serverId); |
| | | } |
| | | else if (msg instanceof UpdateMessage) |
| | | } else if (msg instanceof UpdateMsg) |
| | | { |
| | | // Ignore update received from a replica with |
| | | // a bad generation ID |
| | | long referenceGenerationId = |
| | | replicationServerDomain.getGenerationId(); |
| | | if ((referenceGenerationId>0) && |
| | | boolean filtered = false; |
| | | /* Ignore updates in some cases */ |
| | | if (handler.isLDAPserver()) |
| | | { |
| | | /** |
| | | * Ignore updates from DS in bad BAD_GENID_STATUS or |
| | | * FULL_UPDATE_STATUS |
| | | * |
| | | * The RSD lock should not be taken here as it is acceptable to have |
| | | * a delay between the time the server has a wrong status and the |
| | | * fact we detect it: the updates that succeed to pass during this |
| | | * time will have no impact on remote server. But it is interesting |
| | | * to not saturate uselessly the network if the updates are not |
| | | * necessary so this check to stop sending updates is interesting |
| | | * anyway. Not taking the RSD lock allows to have better |
| | | * performances in normal mode (most of the time). |
| | | */ |
| | | ServerStatus dsStatus = handler.getStatus(); |
| | | if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) || |
| | | (dsStatus == ServerStatus.FULL_UPDATE_STATUS)) |
| | | { |
| | | long referenceGenerationId = |
| | | replicationServerDomain.getGenerationId(); |
| | | if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) |
| | | logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get( |
| | | Short.toString(replicationServerDomain.getReplicationServer(). |
| | | getServerId()), |
| | | replicationServerDomain.getBaseDn().toNormalizedString(), |
| | | ((UpdateMsg) msg).getChangeNumber().toString(), |
| | | Short.toString(handler.getServerId()), |
| | | Long.toString(referenceGenerationId), |
| | | Long.toString(handler.getGenerationId()))); |
| | | if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | | logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get( |
| | | Short.toString(replicationServerDomain.getReplicationServer(). |
| | | getServerId()), |
| | | replicationServerDomain.getBaseDn().toNormalizedString(), |
| | | ((UpdateMsg) msg).getChangeNumber().toString(), |
| | | Short.toString(handler.getServerId()))); |
| | | filtered = true; |
| | | } |
| | | } else |
| | | { |
| | | /** |
| | | * Ignore updates from RS with bad gen id |
| | | * (no system managed status for a RS) |
| | | */ |
| | | long referenceGenerationId = |
| | | replicationServerDomain.getGenerationId(); |
| | | if ((referenceGenerationId > 0) && |
| | | (referenceGenerationId != handler.getGenerationId())) |
| | | { |
| | | logError(ERR_IGNORING_UPDATE_FROM.get( |
| | | msg.toString(), |
| | | handler.getMonitorInstanceName())); |
| | | { |
| | | logError(ERR_IGNORING_UPDATE_FROM_RS.get( |
| | | Short.toString(replicationServerDomain.getReplicationServer(). |
| | | getServerId()), |
| | | replicationServerDomain.getBaseDn().toNormalizedString(), |
| | | ((UpdateMsg) msg).getChangeNumber().toString(), |
| | | Short.toString(handler.getServerId()), |
| | | Long.toString(referenceGenerationId), |
| | | Long.toString(handler.getGenerationId()))); |
| | | filtered = true; |
| | | } |
| | | } |
| | | else |
| | | |
| | | if (!filtered) |
| | | { |
| | | UpdateMessage update = (UpdateMessage) msg; |
| | | UpdateMsg update = (UpdateMsg) msg; |
| | | handler.decAndCheckWindow(); |
| | | replicationServerDomain.put(update, handler); |
| | | } |
| | | } |
| | | else if (msg instanceof WindowMessage) |
| | | } else if (msg instanceof WindowMsg) |
| | | { |
| | | WindowMessage windowMsg = (WindowMessage) msg; |
| | | WindowMsg windowMsg = (WindowMsg) msg; |
| | | handler.updateWindow(windowMsg); |
| | | } |
| | | else if (msg instanceof InitializeRequestMessage) |
| | | } else if (msg instanceof InitializeRequestMsg) |
| | | { |
| | | InitializeRequestMessage initializeMsg = |
| | | (InitializeRequestMessage) msg; |
| | | InitializeRequestMsg initializeMsg = |
| | | (InitializeRequestMsg) msg; |
| | | handler.process(initializeMsg); |
| | | } |
| | | else if (msg instanceof InitializeTargetMessage) |
| | | } else if (msg instanceof InitializeTargetMsg) |
| | | { |
| | | InitializeTargetMessage initializeMsg = (InitializeTargetMessage) msg; |
| | | InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg; |
| | | handler.process(initializeMsg); |
| | | } |
| | | else if (msg instanceof EntryMessage) |
| | | } else if (msg instanceof EntryMsg) |
| | | { |
| | | EntryMessage entryMsg = (EntryMessage) msg; |
| | | EntryMsg entryMsg = (EntryMsg) msg; |
| | | handler.process(entryMsg); |
| | | } |
| | | else if (msg instanceof DoneMessage) |
| | | } else if (msg instanceof DoneMsg) |
| | | { |
| | | DoneMessage doneMsg = (DoneMessage) msg; |
| | | DoneMsg doneMsg = (DoneMsg) msg; |
| | | handler.process(doneMsg); |
| | | } |
| | | else if (msg instanceof ErrorMessage) |
| | | } else if (msg instanceof ErrorMsg) |
| | | { |
| | | ErrorMessage errorMsg = (ErrorMessage) msg; |
| | | ErrorMsg errorMsg = (ErrorMsg) msg; |
| | | handler.process(errorMsg); |
| | | } |
| | | else if (msg instanceof ResetGenerationId) |
| | | } else if (msg instanceof ResetGenerationIdMsg) |
| | | { |
| | | ResetGenerationId genIdMsg = (ResetGenerationId) msg; |
| | | replicationServerDomain.resetGenerationId(this.handler, genIdMsg); |
| | | } |
| | | else if (msg instanceof WindowProbe) |
| | | ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg; |
| | | replicationServerDomain.resetGenerationId(handler, genIdMsg); |
| | | } else if (msg instanceof WindowProbeMsg) |
| | | { |
| | | WindowProbe windowProbeMsg = (WindowProbe) msg; |
| | | WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg; |
| | | handler.process(windowProbeMsg); |
| | | } |
| | | else if (msg instanceof ReplServerInfoMessage) |
| | | } else if (msg instanceof TopologyMsg) |
| | | { |
| | | ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg; |
| | | handler.receiveReplServerInfo(infoMsg); |
| | | replicationServerDomain.receiveReplServerInfo(infoMsg, handler); |
| | | } |
| | | else if (msg instanceof MonitorRequestMessage) |
| | | TopologyMsg topoMsg = (TopologyMsg) msg; |
| | | replicationServerDomain.receiveTopoInfoFromRS(topoMsg, handler, true); |
| | | } else if (msg instanceof ChangeStatusMsg) |
| | | { |
| | | MonitorRequestMessage replServerMonitorRequestMsg = |
| | | (MonitorRequestMessage) msg; |
| | | ChangeStatusMsg csMsg = (ChangeStatusMsg) msg; |
| | | replicationServerDomain.processNewStatus(handler, csMsg); |
| | | } else if (msg instanceof MonitorRequestMsg) |
| | | { |
| | | MonitorRequestMsg replServerMonitorRequestMsg = |
| | | (MonitorRequestMsg) msg; |
| | | handler.process(replServerMonitorRequestMsg); |
| | | } |
| | | else if (msg instanceof MonitorMessage) |
| | | } else if (msg instanceof MonitorMsg) |
| | | { |
| | | MonitorMessage replServerMonitorMsg = (MonitorMessage) msg; |
| | | MonitorMsg replServerMonitorMsg = (MonitorMsg) msg; |
| | | handler.process(replServerMonitorMsg); |
| | | } |
| | | else if (msg == null) |
| | | } else if (msg == null) |
| | | { |
| | | /* |
| | | * The remote server has sent an unknown message, |
| | |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " reader IO EXCEPTION for serverID=" + serverId |
| | | + stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage()); |
| | | Message message = NOTE_SERVER_DISCONNECT.get(handler.toString()); |
| | | " reader IO EXCEPTION for serverID=" + serverId + |
| | | stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage()); |
| | | Message message = NOTE_SERVER_DISCONNECT.get(handler.toString(), |
| | | Short.toString(replicationServerDomain. |
| | | getReplicationServer().getServerId())); |
| | | logError(message); |
| | | } catch (ClassNotFoundException e) |
| | | { |
| | |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " reader CNF EXCEPTION serverID=" + serverId |
| | | + stackTraceToSingleLineString(e)); |
| | | " reader CNF EXCEPTION serverID=" + serverId + |
| | | stackTraceToSingleLineString(e)); |
| | | /* |
| | | * The remote server has sent an unknown message, |
| | | * close the connection. |
| | | */ |
| | | Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString()); |
| | | logError(message); |
| | | } catch (NotSupportedOldVersionPDUException e) |
| | | { |
| | | // Received a V1 PDU we do not need to support: |
| | | // we just trash the message and log the event for debug purpose |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ":" + e.getMessage()); |
| | | } catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " server reader EXCEPTION serverID=" + serverId |
| | | + stackTraceToSingleLineString(e)); |
| | | " server reader EXCEPTION serverID=" + serverId + |
| | | stackTraceToSingleLineString(e)); |
| | | /* |
| | | * The remote server has sent an unknown message, |
| | | * close the connection. |
| | | */ |
| | | Message message = NOTE_READER_EXCEPTION.get(handler.toString()); |
| | | logError(message); |
| | | } |
| | | finally |
| | | } finally |
| | | { |
| | | /* |
| | | * The thread only exit the loop above is some error condition |
| | |
| | | session.close(); |
| | | } catch (IOException e) |
| | | { |
| | | // ignore |
| | | // ignore |
| | | } |
| | | replicationServerDomain.stopServer(handler); |
| | | } |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer()?" RS":" LDAP") + |
| | | " server reader stopped for serverID=" + serverId); |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer() ? " RS" : " LDAP") + |
| | | " server reader stopped for serverID=" + serverId); |
| | | } |
| | | } |