| | |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.io.IOException; |
| | | |
| | | import org.opends.messages.Message; |
| | |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.protocol.*; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.common.ServerStatus.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | | * This class implement the part of the replicationServer that is reading |
| | | * the connection from the LDAP servers to get all the updates that |
| | |
| | | public ServerReader(Session session, ServerHandler handler) |
| | | { |
| | | super("Replication server RS(" + handler.getReplicationServerId() |
| | | + ") reading from " + handler.toString() + " at " |
| | | + ") reading from " + handler + " at " |
| | | + session.getReadableRemoteAddress()); |
| | | this.session = session; |
| | | this.handler = handler; |
| | |
| | | Message errMessage = null; |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(this.getName() + " starting"); |
| | | TRACER.debugInfo(getName() + " starting"); |
| | | } |
| | | /* |
| | | * wait on input stream |
| | |
| | | |
| | | if (msg instanceof AckMsg) |
| | | { |
| | | AckMsg ack = (AckMsg) msg; |
| | | handler.checkWindow(); |
| | | handler.processAck(ack); |
| | | handler.processAck((AckMsg) msg); |
| | | } else if (msg instanceof UpdateMsg) |
| | | { |
| | | UpdateMsg updateMsg = (UpdateMsg) msg; |
| | | |
| | | boolean filtered = false; |
| | | /* Ignore updates in some cases */ |
| | | // Ignore updates in some cases |
| | | if (handler.isDataServer()) |
| | | { |
| | | /** |
| | |
| | | * better performances in normal mode (most of the time). |
| | | */ |
| | | ServerStatus dsStatus = handler.getStatus(); |
| | | if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) || |
| | | (dsStatus == ServerStatus.FULL_UPDATE_STATUS)) |
| | | if (dsStatus == BAD_GEN_ID_STATUS |
| | | || dsStatus == FULL_UPDATE_STATUS) |
| | | { |
| | | long referenceGenerationId = handler.getReferenceGenId(); |
| | | if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) |
| | | if (dsStatus == BAD_GEN_ID_STATUS) |
| | | logError(WARN_IGNORING_UPDATE_FROM_DS_BADGENID.get( |
| | | handler.getReplicationServerId(), |
| | | ((UpdateMsg) msg).getChangeNumber().toString(), |
| | | updateMsg.getChangeNumber().toString(), |
| | | handler.getBaseDN(), handler.getServerId(), |
| | | session.getReadableRemoteAddress(), |
| | | handler.getGenerationId(), |
| | | referenceGenerationId)); |
| | | if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | | if (dsStatus == FULL_UPDATE_STATUS) |
| | | logError(WARN_IGNORING_UPDATE_FROM_DS_FULLUP.get( |
| | | handler.getReplicationServerId(), |
| | | ((UpdateMsg) msg).getChangeNumber().toString(), |
| | | updateMsg.getChangeNumber().toString(), |
| | | handler.getBaseDN(), handler.getServerId(), |
| | | session.getReadableRemoteAddress())); |
| | | filtered = true; |
| | |
| | | * Ignore updates from RS with bad gen id |
| | | * (no system managed status for a RS) |
| | | */ |
| | | long referenceGenerationId =handler.getReferenceGenId(); |
| | | if ((referenceGenerationId > 0) && |
| | | (referenceGenerationId != handler.getGenerationId())) |
| | | long referenceGenerationId = handler.getReferenceGenId(); |
| | | if (referenceGenerationId > 0 |
| | | && referenceGenerationId != handler.getGenerationId()) |
| | | { |
| | | logError( |
| | | WARN_IGNORING_UPDATE_FROM_RS.get( |
| | | handler.getReplicationServerId(), |
| | | ((UpdateMsg) msg).getChangeNumber().toString(), |
| | | updateMsg.getChangeNumber().toString(), |
| | | handler.getBaseDN(), |
| | | handler.getServerId(), |
| | | session.getReadableRemoteAddress(), |
| | |
| | | |
| | | if (!filtered) |
| | | { |
| | | UpdateMsg update = (UpdateMsg) msg; |
| | | handler.decAndCheckWindow(); |
| | | handler.put(update); |
| | | handler.put(updateMsg); |
| | | } |
| | | } else if (msg instanceof WindowMsg) |
| | | { |
| | | WindowMsg windowMsg = (WindowMsg) msg; |
| | | handler.updateWindow(windowMsg); |
| | | } else if (msg instanceof InitializeRequestMsg) |
| | | handler.updateWindow((WindowMsg) msg); |
| | | } else if (msg instanceof RoutableMsg) |
| | | { |
| | | InitializeRequestMsg initializeMsg = |
| | | (InitializeRequestMsg) msg; |
| | | handler.process(initializeMsg); |
| | | } else if (msg instanceof InitializeRcvAckMsg) |
| | | { |
| | | InitializeRcvAckMsg initializeRcvAckMsg = |
| | | (InitializeRcvAckMsg) msg; |
| | | handler.process(initializeRcvAckMsg); |
| | | } else if (msg instanceof InitializeTargetMsg) |
| | | { |
| | | InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg; |
| | | handler.process(initializeMsg); |
| | | } else if (msg instanceof EntryMsg) |
| | | { |
| | | EntryMsg entryMsg = (EntryMsg) msg; |
| | | handler.process(entryMsg); |
| | | } else if (msg instanceof DoneMsg) |
| | | { |
| | | DoneMsg doneMsg = (DoneMsg) msg; |
| | | handler.process(doneMsg); |
| | | } else if (msg instanceof ErrorMsg) |
| | | { |
| | | ErrorMsg errorMsg = (ErrorMsg) msg; |
| | | handler.process(errorMsg); |
| | | handler.process((RoutableMsg) msg); |
| | | } else if (msg instanceof ResetGenerationIdMsg) |
| | | { |
| | | ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg; |
| | | handler.processResetGenId(genIdMsg); |
| | | handler.processResetGenId((ResetGenerationIdMsg) msg); |
| | | } else if (msg instanceof WindowProbeMsg) |
| | | { |
| | | WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg; |
| | | handler.process(windowProbeMsg); |
| | | handler.replyToWindowProbe(); |
| | | } else if (msg instanceof TopologyMsg) |
| | | { |
| | | TopologyMsg topoMsg = (TopologyMsg) msg; |
| | | ReplicationServerHandler rsh = (ReplicationServerHandler)handler; |
| | | rsh.receiveTopoInfoFromRS(topoMsg); |
| | | rsh.receiveTopoInfoFromRS((TopologyMsg) msg); |
| | | } else if (msg instanceof ChangeStatusMsg) |
| | | { |
| | | ChangeStatusMsg csMsg = (ChangeStatusMsg) msg; |
| | |
| | | csMsg.toString()); |
| | | logError(errMessage); |
| | | } |
| | | } else if (msg instanceof MonitorRequestMsg) |
| | | { |
| | | MonitorRequestMsg replServerMonitorRequestMsg = |
| | | (MonitorRequestMsg) msg; |
| | | handler.process(replServerMonitorRequestMsg); |
| | | } else if (msg instanceof MonitorMsg) |
| | | { |
| | | MonitorMsg replServerMonitorMsg = (MonitorMsg) msg; |
| | | handler.process(replServerMonitorMsg); |
| | | } else if (msg instanceof ChangeTimeHeartbeatMsg) |
| | | { |
| | | ChangeTimeHeartbeatMsg cthbMsg = (ChangeTimeHeartbeatMsg) msg; |
| | | handler.process(cthbMsg); |
| | | handler.process((ChangeTimeHeartbeatMsg) msg); |
| | | } else if (msg instanceof StopMsg) |
| | | { |
| | | // Peer server is properly disconnecting: go out of here to |
| | | // properly close the server handler going to finally block. |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(handler.toString() + " has properly " + |
| | | "disconnected from this replication server " + |
| | | Integer.toString(handler.getReplicationServerId())); |
| | | TRACER.debugInfo(handler |
| | | + " has properly disconnected from this replication server " |
| | | + handler.getReplicationServerId()); |
| | | } |
| | | return; |
| | | } else if (msg == null) |
| | |
| | | // Received a V1 PDU we do not need to support: |
| | | // we just trash the message and log the event for debug purpose, |
| | | // then continue receiving messages. |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.getName() + " " + stackTraceToSingleLineString(e)); |
| | | logException(e); |
| | | } |
| | | } |
| | | } |
| | |
| | | * Log a message and exit from this loop |
| | | * So that this handler is stopped. |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.getName() + " " + stackTraceToSingleLineString(e)); |
| | | logException(e); |
| | | if (!handler.shuttingDown()) |
| | | { |
| | | if (handler.isDataServer()) |
| | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.getName() + " " + stackTraceToSingleLineString(e)); |
| | | logException(e); |
| | | /* |
| | | * The remote server has sent an unknown message, |
| | | * close the connection. |
| | |
| | | */ |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + this.getName() + " closing the session"); |
| | | TRACER.debugInfo("In " + getName() + " closing the session"); |
| | | } |
| | | session.close(); |
| | | handler.doStop(); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(this.getName() + " stopped " + errMessage); |
| | | TRACER.debugInfo(getName() + " stopped: " + errMessage); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void logException(Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + getName() + " " + stackTraceToSingleLineString(e)); |
| | | } |
| | | } |