| | |
| | | { |
| | | while (true) |
| | | { |
| | | ReplicationMsg msg = session.receive(); |
| | | try |
| | | { |
| | | ReplicationMsg msg = session.receive(); |
| | | |
| | | /* |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer()?" From RS ":" From LS")+ |
| | | " with serverId=" + serverId + " receives " + msg); |
| | | } |
| | | */ |
| | | if (msg instanceof AckMsg) |
| | | { |
| | | AckMsg ack = (AckMsg) msg; |
| | | handler.checkWindow(); |
| | | replicationServerDomain.ack(ack, serverId); |
| | | } else if (msg instanceof UpdateMsg) |
| | | { |
| | | boolean filtered = false; |
| | | /* Ignore updates in some cases */ |
| | | if (handler.isLDAPserver()) |
| | | /* |
| | | if (debugEnabled()) |
| | | { |
| | | /** |
| | | * Ignore updates from DS in bad BAD_GENID_STATUS or |
| | | * FULL_UPDATE_STATUS |
| | | * |
| | | * The RSD lock should not be taken here as it is acceptable to have |
| | | * a delay between the time the server has a wrong status and the |
| | | * fact we detect it: the updates that succeed to pass during this |
| | | * time will have no impact on remote server. But it is interesting |
| | | * to not saturate uselessly the network if the updates are not |
| | | * necessary so this check to stop sending updates is interesting |
| | | * anyway. Not taking the RSD lock allows to have better |
| | | * performances in normal mode (most of the time). |
| | | */ |
| | | ServerStatus dsStatus = handler.getStatus(); |
| | | if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) || |
| | | (dsStatus == ServerStatus.FULL_UPDATE_STATUS)) |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer()?" From RS ":" From LS")+ |
| | | " with serverId=" + serverId + " receives " + msg); |
| | | } |
| | | */ |
| | | if (msg instanceof AckMsg) |
| | | { |
| | | AckMsg ack = (AckMsg) msg; |
| | | handler.checkWindow(); |
| | | replicationServerDomain.ack(ack, serverId); |
| | | } else if (msg instanceof UpdateMsg) |
| | | { |
| | | boolean filtered = false; |
| | | /* Ignore updates in some cases */ |
| | | if (handler.isLDAPserver()) |
| | | { |
| | | /** |
| | | * Ignore updates from DS in bad BAD_GENID_STATUS or |
| | | * FULL_UPDATE_STATUS |
| | | * |
| | | * The RSD lock should not be taken here as it is acceptable to |
| | | * have a delay between the time the server has a wrong status and |
| | | * the fact we detect it: the updates that succeed to pass during |
| | | * this time will have no impact on remote server. But it is |
| | | * interesting to not saturate uselessly the network if the |
| | | * updates are not necessary so this check to stop sending updates |
| | | * is interesting anyway. Not taking the RSD lock allows to have |
| | | * better performances in normal mode (most of the time). |
| | | */ |
| | | ServerStatus dsStatus = handler.getStatus(); |
| | | if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) || |
| | | (dsStatus == ServerStatus.FULL_UPDATE_STATUS)) |
| | | { |
| | | long referenceGenerationId = |
| | | replicationServerDomain.getGenerationId(); |
| | | if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) |
| | | logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get( |
| | | Short.toString(replicationServerDomain. |
| | | getReplicationServer().getServerId()), |
| | | replicationServerDomain.getBaseDn().toNormalizedString(), |
| | | ((UpdateMsg) msg).getChangeNumber().toString(), |
| | | Short.toString(handler.getServerId()), |
| | | Long.toString(referenceGenerationId), |
| | | Long.toString(handler.getGenerationId()))); |
| | | if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | | logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get( |
| | | Short.toString(replicationServerDomain. |
| | | getReplicationServer().getServerId()), |
| | | replicationServerDomain.getBaseDn().toNormalizedString(), |
| | | ((UpdateMsg) msg).getChangeNumber().toString(), |
| | | Short.toString(handler.getServerId()))); |
| | | filtered = true; |
| | | } |
| | | } else |
| | | { |
| | | /** |
| | | * Ignore updates from RS with bad gen id |
| | | * (no system managed status for a RS) |
| | | */ |
| | | long referenceGenerationId = |
| | | replicationServerDomain.getGenerationId(); |
| | | if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) |
| | | logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get( |
| | | if ((referenceGenerationId > 0) && |
| | | (referenceGenerationId != handler.getGenerationId())) |
| | | { |
| | | logError(ERR_IGNORING_UPDATE_FROM_RS.get( |
| | | Short.toString(replicationServerDomain.getReplicationServer(). |
| | | getServerId()), |
| | | replicationServerDomain.getBaseDn().toNormalizedString(), |
| | |
| | | Short.toString(handler.getServerId()), |
| | | Long.toString(referenceGenerationId), |
| | | Long.toString(handler.getGenerationId()))); |
| | | if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | | logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get( |
| | | Short.toString(replicationServerDomain.getReplicationServer(). |
| | | getServerId()), |
| | | replicationServerDomain.getBaseDn().toNormalizedString(), |
| | | ((UpdateMsg) msg).getChangeNumber().toString(), |
| | | Short.toString(handler.getServerId()))); |
| | | filtered = true; |
| | | filtered = true; |
| | | } |
| | | } |
| | | } else |
| | | { |
| | | /** |
| | | * Ignore updates from RS with bad gen id |
| | | * (no system managed status for a RS) |
| | | */ |
| | | long referenceGenerationId = |
| | | replicationServerDomain.getGenerationId(); |
| | | if ((referenceGenerationId > 0) && |
| | | (referenceGenerationId != handler.getGenerationId())) |
| | | { |
| | | logError(ERR_IGNORING_UPDATE_FROM_RS.get( |
| | | Short.toString(replicationServerDomain.getReplicationServer(). |
| | | getServerId()), |
| | | replicationServerDomain.getBaseDn().toNormalizedString(), |
| | | ((UpdateMsg) msg).getChangeNumber().toString(), |
| | | Short.toString(handler.getServerId()), |
| | | Long.toString(referenceGenerationId), |
| | | Long.toString(handler.getGenerationId()))); |
| | | filtered = true; |
| | | } |
| | | } |
| | | |
| | | if (!filtered) |
| | | if (!filtered) |
| | | { |
| | | UpdateMsg update = (UpdateMsg) msg; |
| | | handler.decAndCheckWindow(); |
| | | replicationServerDomain.put(update, handler); |
| | | } |
| | | } else if (msg instanceof WindowMsg) |
| | | { |
| | | UpdateMsg update = (UpdateMsg) msg; |
| | | handler.decAndCheckWindow(); |
| | | replicationServerDomain.put(update, handler); |
| | | WindowMsg windowMsg = (WindowMsg) msg; |
| | | handler.updateWindow(windowMsg); |
| | | } else if (msg instanceof InitializeRequestMsg) |
| | | { |
| | | InitializeRequestMsg initializeMsg = |
| | | (InitializeRequestMsg) msg; |
| | | handler.process(initializeMsg); |
| | | } else if (msg instanceof InitializeTargetMsg) |
| | | { |
| | | InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg; |
| | | handler.process(initializeMsg); |
| | | } else if (msg instanceof EntryMsg) |
| | | { |
| | | EntryMsg entryMsg = (EntryMsg) msg; |
| | | handler.process(entryMsg); |
| | | } else if (msg instanceof DoneMsg) |
| | | { |
| | | DoneMsg doneMsg = (DoneMsg) msg; |
| | | handler.process(doneMsg); |
| | | } else if (msg instanceof ErrorMsg) |
| | | { |
| | | ErrorMsg errorMsg = (ErrorMsg) msg; |
| | | handler.process(errorMsg); |
| | | } else if (msg instanceof ResetGenerationIdMsg) |
| | | { |
| | | ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg; |
| | | replicationServerDomain.resetGenerationId(handler, genIdMsg); |
| | | } else if (msg instanceof WindowProbeMsg) |
| | | { |
| | | WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg; |
| | | handler.process(windowProbeMsg); |
| | | } else if (msg instanceof TopologyMsg) |
| | | { |
| | | TopologyMsg topoMsg = (TopologyMsg) msg; |
| | | replicationServerDomain.receiveTopoInfoFromRS(topoMsg, |
| | | handler, true); |
| | | } else if (msg instanceof ChangeStatusMsg) |
| | | { |
| | | ChangeStatusMsg csMsg = (ChangeStatusMsg) msg; |
| | | replicationServerDomain.processNewStatus(handler, csMsg); |
| | | } else if (msg instanceof MonitorRequestMsg) |
| | | { |
| | | MonitorRequestMsg replServerMonitorRequestMsg = |
| | | (MonitorRequestMsg) msg; |
| | | handler.process(replServerMonitorRequestMsg); |
| | | } else if (msg instanceof MonitorMsg) |
| | | { |
| | | MonitorMsg replServerMonitorMsg = (MonitorMsg) msg; |
| | | handler.process(replServerMonitorMsg); |
| | | } else if (msg == null) |
| | | { |
| | | /* |
| | | * The remote server has sent an unknown message, |
| | | * close the conenction. |
| | | */ |
| | | Message message = NOTE_READER_NULL_MSG.get(handler.toString()); |
| | | logError(message); |
| | | return; |
| | | } |
| | | } else if (msg instanceof WindowMsg) |
| | | } catch (NotSupportedOldVersionPDUException e) |
| | | { |
| | | WindowMsg windowMsg = (WindowMsg) msg; |
| | | handler.updateWindow(windowMsg); |
| | | } else if (msg instanceof InitializeRequestMsg) |
| | | { |
| | | InitializeRequestMsg initializeMsg = |
| | | (InitializeRequestMsg) msg; |
| | | handler.process(initializeMsg); |
| | | } else if (msg instanceof InitializeTargetMsg) |
| | | { |
| | | InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg; |
| | | handler.process(initializeMsg); |
| | | } else if (msg instanceof EntryMsg) |
| | | { |
| | | EntryMsg entryMsg = (EntryMsg) msg; |
| | | handler.process(entryMsg); |
| | | } else if (msg instanceof DoneMsg) |
| | | { |
| | | DoneMsg doneMsg = (DoneMsg) msg; |
| | | handler.process(doneMsg); |
| | | } else if (msg instanceof ErrorMsg) |
| | | { |
| | | ErrorMsg errorMsg = (ErrorMsg) msg; |
| | | handler.process(errorMsg); |
| | | } else if (msg instanceof ResetGenerationIdMsg) |
| | | { |
| | | ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg; |
| | | replicationServerDomain.resetGenerationId(handler, genIdMsg); |
| | | } else if (msg instanceof WindowProbeMsg) |
| | | { |
| | | WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg; |
| | | handler.process(windowProbeMsg); |
| | | } else if (msg instanceof TopologyMsg) |
| | | { |
| | | TopologyMsg topoMsg = (TopologyMsg) msg; |
| | | replicationServerDomain.receiveTopoInfoFromRS(topoMsg, handler, true); |
| | | } else if (msg instanceof ChangeStatusMsg) |
| | | { |
| | | ChangeStatusMsg csMsg = (ChangeStatusMsg) msg; |
| | | replicationServerDomain.processNewStatus(handler, csMsg); |
| | | } else if (msg instanceof MonitorRequestMsg) |
| | | { |
| | | MonitorRequestMsg replServerMonitorRequestMsg = |
| | | (MonitorRequestMsg) msg; |
| | | handler.process(replServerMonitorRequestMsg); |
| | | } else if (msg instanceof MonitorMsg) |
| | | { |
| | | MonitorMsg replServerMonitorMsg = (MonitorMsg) msg; |
| | | handler.process(replServerMonitorMsg); |
| | | } else if (msg == null) |
| | | { |
| | | /* |
| | | * The remote server has sent an unknown message, |
| | | * close the conenction. |
| | | */ |
| | | Message message = NOTE_READER_NULL_MSG.get(handler.toString()); |
| | | logError(message); |
| | | return; |
| | | // Received a V1 PDU we do not need to support: |
| | | // we just trash the message and log the event for debug purpose, |
| | | // then continue receiving messages. |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServerDomain. |
| | | getReplicationServer(). |
| | | getMonitorInstanceName() + ":" + e.getMessage()); |
| | | } |
| | | } |
| | | } catch (IOException e) |
| | |
| | | */ |
| | | Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString()); |
| | | logError(message); |
| | | } catch (NotSupportedOldVersionPDUException e) |
| | | { |
| | | // Received a V1 PDU we do not need to support: |
| | | // we just trash the message and log the event for debug purpose |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + ":" + e.getMessage()); |
| | | } catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |