| | |
| | | */ |
| | | private final Map<Integer, DbHandler> sourceDbHandlers = |
| | | new ConcurrentHashMap<Integer, DbHandler>(); |
| | | private ReplicationServer replicationServer; |
| | | /** The ReplicationServer that created the current instance. */ |
| | | private ReplicationServer localReplicationServer; |
| | | |
| | | /** GenerationId management. */ |
| | | private volatile long generationId = -1; |
| | |
| | | * Creates a new ReplicationServerDomain associated to the DN baseDn. |
| | | * |
| | | * @param baseDn The baseDn associated to the ReplicationServerDomain. |
| | | * @param replicationServer the ReplicationServer that created this |
| | | * @param localReplicationServer the ReplicationServer that created this |
| | | * replicationServer cache. |
| | | */ |
| | | public ReplicationServerDomain( |
| | | String baseDn, ReplicationServer replicationServer) |
| | | public ReplicationServerDomain(String baseDn, |
| | | ReplicationServer localReplicationServer) |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.replicationServer = replicationServer; |
| | | this.localReplicationServer = localReplicationServer; |
| | | this.assuredTimeoutTimer = new Timer("Replication server RS(" |
| | | + replicationServer.getServerId() |
| | | + localReplicationServer.getServerId() |
| | | + ") assured timer for domain \"" + baseDn + "\"", true); |
| | | |
| | | DirectoryServer.registerMonitorProvider(this); |
| | |
| | | public void put(UpdateMsg update, ServerHandler sourceHandler) |
| | | throws IOException |
| | | { |
| | | |
| | | ChangeNumber cn = update.getChangeNumber(); |
| | | int id = cn.getServerId(); |
| | | int serverId = cn.getServerId(); |
| | | |
| | | sourceHandler.updateServerState(update); |
| | | sourceHandler.incrementInCount(); |
| | | |
| | |
| | | { |
| | | // Unknown assured mode: should never happen |
| | | Message errorMsg = ERR_RS_UNKNOWN_ASSURED_MODE.get( |
| | | Integer.toString(replicationServer.getServerId()), |
| | | Integer.toString(localReplicationServer.getServerId()), |
| | | assuredMode.toString(), baseDn, update.toString()); |
| | | logError(errorMsg); |
| | | assuredMessage = false; |
| | |
| | | } |
| | | } |
| | | |
| | | // look for the dbHandler that is responsible for the LDAP server which |
| | | // generated the change. |
| | | DbHandler dbHandler; |
| | | synchronized (sourceDbHandlers) |
| | | if (!publishMessage(update, serverId)) |
| | | { |
| | | dbHandler = sourceDbHandlers.get(id); |
| | | if (dbHandler == null) |
| | | { |
| | | try |
| | | { |
| | | dbHandler = replicationServer.newDbHandler(id, baseDn); |
| | | generationIdSavedStatus = true; |
| | | } catch (ChangelogException e) |
| | | { |
| | | /* |
| | | * Because of database problem we can't save any more changes |
| | | * from at least one LDAP server. |
| | | * This replicationServer therefore can't do it's job properly anymore |
| | | * and needs to close all its connections and shutdown itself. |
| | | */ |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | replicationServer.shutdown(); |
| | | return; |
| | | } |
| | | sourceDbHandlers.put(id, dbHandler); |
| | | } |
| | | return; |
| | | } |
| | | |
| | | // Publish the messages to the source handler |
| | | dbHandler.add(update); |
| | | |
| | | List<Integer> expectedServers = null; |
| | | if (assuredMessage) |
| | | { |
| | |
| | | // times out) |
| | | AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(cn); |
| | | assuredTimeoutTimer.schedule(assuredTimeoutTask, |
| | | replicationServer.getAssuredTimeout()); |
| | | localReplicationServer.getAssuredTimeout()); |
| | | // Purge timer every 100 treated messages |
| | | assuredTimeoutTimerPurgeCounter++; |
| | | if ((assuredTimeoutTimerPurgeCounter % 100) == 0) |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In Replication Server " |
| | | + replicationServer.getReplicationPort() + " " + baseDn + " " |
| | | + replicationServer.getServerId() + " for dn " + baseDn |
| | | + localReplicationServer.getReplicationPort() + " " + baseDn |
| | | + " " |
| | | + localReplicationServer.getServerId() + " for dn " + baseDn |
| | | + ", update " + update.getChangeNumber() |
| | | + " will not be sent to replication server " |
| | | + handler.getServerId() + " with generation id " |
| | |
| | | } |
| | | if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | | { |
| | | TRACER.debugInfo("In RS " + replicationServer.getServerId() |
| | | TRACER.debugInfo("In RS " + localReplicationServer.getServerId() |
| | | + " for dn " + baseDn + ", update " + update.getChangeNumber() |
| | | + " will not be sent to directory server " |
| | | + handler.getServerId() + " as it is in full update"); |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean publishMessage(UpdateMsg update, int serverId) |
| | | { |
| | | // look for the dbHandler that is responsible for the LDAP server which |
| | | // generated the change. |
| | | DbHandler dbHandler; |
| | | synchronized (sourceDbHandlers) |
| | | { |
| | | dbHandler = sourceDbHandlers.get(serverId); |
| | | if (dbHandler == null) |
| | | { |
| | | try |
| | | { |
| | | dbHandler = localReplicationServer.newDbHandler(serverId, baseDn); |
| | | generationIdSavedStatus = true; |
| | | } catch (ChangelogException e) |
| | | { |
| | | /* |
| | | * Because of database problem we can't save any more changes |
| | | * from at least one LDAP server. |
| | | * This replicationServer therefore can't do it's job properly anymore |
| | | * and needs to close all its connections and shutdown itself. |
| | | */ |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | localReplicationServer.shutdown(); |
| | | return false; |
| | | } |
| | | sourceDbHandlers.put(serverId, dbHandler); |
| | | } |
| | | } |
| | | |
| | | // Publish the messages to the source handler |
| | | dbHandler.add(update); |
| | | return true; |
| | | } |
| | | |
| | | private NotAssuredUpdateMsg addUpdate(ServerHandler handler, |
| | | UpdateMsg update, NotAssuredUpdateMsg notAssuredUpdate, |
| | | boolean assuredMessage, List<Integer> expectedServers) |
| | |
| | | UpdateMsg update, ServerHandler sourceHandler) throws IOException |
| | | { |
| | | ChangeNumber cn = update.getChangeNumber(); |
| | | byte groupId = replicationServer.getGroupId(); |
| | | byte groupId = localReplicationServer.getGroupId(); |
| | | byte sourceGroupId = sourceHandler.getGroupId(); |
| | | List<Integer> expectedServers = new ArrayList<Integer>(); |
| | | List<Integer> wrongStatusServers = new ArrayList<Integer>(); |
| | |
| | | ChangeNumber cn = update.getChangeNumber(); |
| | | boolean interestedInAcks = false; |
| | | byte safeDataLevel = update.getSafeDataLevel(); |
| | | byte groupId = replicationServer.getGroupId(); |
| | | byte groupId = localReplicationServer.getGroupId(); |
| | | byte sourceGroupId = sourceHandler.getGroupId(); |
| | | if (safeDataLevel < (byte) 1) |
| | | { |
| | | // Should never happen |
| | | Message errorMsg = ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL.get( |
| | | Integer.toString(replicationServer.getServerId()), |
| | | Integer.toString(localReplicationServer.getServerId()), |
| | | Byte.toString(safeDataLevel), baseDn, update.toString()); |
| | | logError(errorMsg); |
| | | } else if (sourceGroupId == groupId |
| | |
| | | */ |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_RS_ERROR_SENDING_ACK.get( |
| | | Integer.toString(replicationServer.getServerId()), |
| | | Integer.toString(localReplicationServer.getServerId()), |
| | | Integer.toString(origServer.getServerId()), |
| | | cn.toString(), baseDn)); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | |
| | | ServerHandler origServer = expectedAcksInfo.getRequesterServer(); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RS " + replicationServer.getServerId() |
| | | TRACER.debugInfo("In RS " + localReplicationServer.getServerId() |
| | | + " for "+ baseDn |
| | | + ", sending timeout for assured update with change " |
| | | + " number " + cn + " to server id " |
| | |
| | | */ |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_RS_ERROR_SENDING_ACK.get( |
| | | Integer.toString(replicationServer.getServerId()), |
| | | Integer.toString(localReplicationServer.getServerId()), |
| | | Integer.toString(origServer.getServerId()), |
| | | cn.toString(), baseDn)); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | |
| | | { |
| | | // looks like two connected LDAP servers have the same serverId |
| | | Message message = ERR_DUPLICATE_SERVER_ID.get( |
| | | replicationServer.getMonitorInstanceName(), |
| | | localReplicationServer.getMonitorInstanceName(), |
| | | directoryServers.get(handler.getServerId()).toString(), |
| | | handler.toString(), handler.getServerId()); |
| | | logError(message); |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | TRACER.debugInfo("In " |
| | | + this.localReplicationServer.getMonitorInstanceName() |
| | | + " domain=" + this + " stopServer() on the server handler " |
| | | + handler.getMonitorInstanceName()); |
| | | } |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() |
| | | TRACER.debugInfo("In " |
| | | + localReplicationServer.getMonitorInstanceName() |
| | | + " remote server " + handler.getMonitorInstanceName() |
| | | + " is the last RS/DS to be stopped:" |
| | | + " stopping monitoring publisher"); |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " |
| | | + replicationServer.getMonitorInstanceName() |
| | | + localReplicationServer.getMonitorInstanceName() |
| | | + " remote server " + handler.getMonitorInstanceName() |
| | | + " is the last DS to be stopped: stopping status analyzer"); |
| | | } |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | TRACER.debugInfo("In " |
| | | + this.localReplicationServer.getMonitorInstanceName() |
| | | + " domain=" + this + " stopServer() on the message handler " |
| | | + handler.getMonitorInstanceName()); |
| | | } |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RS " |
| | | + this.replicationServer.getMonitorInstanceName() + " for " + baseDn |
| | | + " " + " mayResetGenerationId generationIdSavedStatus=" |
| | | + this.localReplicationServer.getMonitorInstanceName() |
| | | + " for " + baseDn + " mayResetGenerationId generationIdSavedStatus=" |
| | | + generationIdSavedStatus); |
| | | } |
| | | |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RS " |
| | | + this.replicationServer.getMonitorInstanceName() + " for " |
| | | + this.localReplicationServer.getMonitorInstanceName() + " for " |
| | | + baseDn + " " + " mayResetGenerationId skip RS" |
| | | + rsh.getMonitorInstanceName() + " that has different genId"); |
| | | } |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RS " |
| | | + this.replicationServer.getMonitorInstanceName() |
| | | + this.localReplicationServer.getMonitorInstanceName() |
| | | + " for "+ baseDn + " mayResetGenerationId RS" |
| | | + rsh.getMonitorInstanceName() |
| | | + " has servers connected to it" |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RS " |
| | | + this.replicationServer.getMonitorInstanceName() + " for " |
| | | + this.localReplicationServer.getMonitorInstanceName() + " for " |
| | | + baseDn + " " |
| | | + " has servers connected to it - will not reset generationId"); |
| | | } |
| | |
| | | // looks like two replication servers have the same serverId |
| | | // log an error message and drop this connection. |
| | | Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get( |
| | | replicationServer.getMonitorInstanceName(), oldHandler. |
| | | localReplicationServer.getMonitorInstanceName(), oldHandler. |
| | | getServerAddressURL(), handler.getServerAddressURL(), |
| | | handler.getServerId()); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | |
| | | * and locks used by the ReplicationIterator. |
| | | * |
| | | * @param serverId Identifier of the server for which the iterator is created. |
| | | * @param changeNumber Starting point for the iterator. |
| | | * @param startAfterCN Starting point for the iterator. |
| | | * @return the created ReplicationIterator. Null when no DB is available |
| | | * for the provided server Id. |
| | | */ |
| | | public ReplicationIterator getChangelogIterator(int serverId, |
| | | ChangeNumber changeNumber) |
| | | ChangeNumber startAfterCN) |
| | | { |
| | | DbHandler handler = sourceDbHandlers.get(serverId); |
| | | if (handler == null) |
| | |
| | | ReplicationIterator it; |
| | | try |
| | | { |
| | | it = handler.generateIterator(changeNumber); |
| | | it = handler.generateIterator(startAfterCN); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Processes a message coming from one server in the topology |
| | | * and potentially forwards it to one or all other servers. |
| | | * Processes a message coming from one server in the topology and potentially |
| | | * forwards it to one or all other servers. |
| | | * |
| | | * @param msg The message received and to be processed. |
| | | * @param senderHandler The server handler of the server that emitted |
| | | * the message. |
| | | * @param msg |
| | | * The message received and to be processed. |
| | | * @param msgEmitter |
| | | * The server handler of the server that emitted the message. |
| | | */ |
| | | public void process(RoutableMsg msg, ServerHandler senderHandler) |
| | | public void process(RoutableMsg msg, ServerHandler msgEmitter) |
| | | { |
| | | // Test the message for which a ReplicationServer is expected |
| | | // to be the destination |
| | |
| | | !(msg instanceof InitializeRcvAckMsg) && |
| | | !(msg instanceof EntryMsg) && |
| | | !(msg instanceof DoneMsg) && |
| | | (msg.getDestination() == this.replicationServer.getServerId())) |
| | | (msg.getDestination() == this.localReplicationServer.getServerId())) |
| | | { |
| | | if (msg instanceof ErrorMsg) |
| | | { |
| | | ErrorMsg errorMsg = (ErrorMsg) msg; |
| | | logError(ERR_ERROR_MSG_RECEIVED.get( |
| | | errorMsg.getDetails())); |
| | | logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails())); |
| | | } else if (msg instanceof MonitorRequestMsg) |
| | | { |
| | | // If the request comes from a Directory Server we need to |
| | | // build the full list of all servers in the topology |
| | | // and send back a MonitorMsg with the full list of all the servers |
| | | // in the topology. |
| | | if (senderHandler.isDataServer()) |
| | | { |
| | | // Monitoring information requested by a DS |
| | | MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg( |
| | | msg.getDestination(), msg.getSenderID(), monitorData); |
| | | |
| | | if (monitorMsg != null) |
| | | { |
| | | try |
| | | { |
| | | senderHandler.send(monitorMsg); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | // the connection was closed. |
| | | } |
| | | } |
| | | return; |
| | | } else |
| | | { |
| | | // Monitoring information requested by a RS |
| | | MonitorMsg monitorMsg = |
| | | createLocalTopologyMonitorMsg(msg.getDestination(), |
| | | msg.getSenderID()); |
| | | |
| | | if (monitorMsg != null) |
| | | { |
| | | try |
| | | { |
| | | senderHandler.send(monitorMsg); |
| | | } catch (Exception e) |
| | | { |
| | | // We log the error. The requestor will detect a timeout or |
| | | // any other failure on the connection. |
| | | logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get( |
| | | Integer.toString(msg.getDestination()))); |
| | | } |
| | | } |
| | | } |
| | | replyWithMonitorMsg(msg, msgEmitter); |
| | | } else if (msg instanceof MonitorMsg) |
| | | { |
| | | MonitorMsg monitorMsg = (MonitorMsg) msg; |
| | | receivesMonitorDataResponse(monitorMsg, senderHandler.getServerId()); |
| | | receivesMonitorDataResponse(monitorMsg, msgEmitter.getServerId()); |
| | | } else |
| | | { |
| | | logError(NOTE_ERR_ROUTING_TO_SERVER.get( |
| | | msg.getClass().getCanonicalName())); |
| | | |
| | | MessageBuilder mb1 = new MessageBuilder(); |
| | | mb1.append( |
| | | NOTE_ERR_ROUTING_TO_SERVER.get(msg.getClass().getCanonicalName())); |
| | | mb1.append("serverID:").append(msg.getDestination()); |
| | | ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), mb1.toMessage()); |
| | | try |
| | | { |
| | | senderHandler.send(errMsg); |
| | | } catch (IOException ioe1) |
| | | { |
| | | // an error happened on the sender session trying to recover |
| | | // from an error on the receiver session. |
| | | // Not much more we can do at this point. |
| | | } |
| | | replyWithUnroutableMsgType(msgEmitter, msg); |
| | | } |
| | | return; |
| | | } |
| | | |
| | | List<ServerHandler> servers = getDestinationServers(msg, senderHandler); |
| | | |
| | | if (servers.isEmpty()) |
| | | List<ServerHandler> servers = getDestinationServers(msg, msgEmitter); |
| | | if (!servers.isEmpty()) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get( |
| | | this.baseDn, Integer.toString(msg.getDestination()))); |
| | | mb.append(" In Replication Server=").append( |
| | | this.replicationServer.getMonitorInstanceName()); |
| | | mb.append(" unroutable message =").append(msg.getClass().getSimpleName()); |
| | | mb.append(" Details:routing table is empty"); |
| | | ErrorMsg errMsg = new ErrorMsg( |
| | | this.replicationServer.getServerId(), |
| | | msg.getSenderID(), |
| | | mb.toMessage()); |
| | | logError(mb.toMessage()); |
| | | forwardMsgToAllServers(msg, servers, msgEmitter); |
| | | } |
| | | else |
| | | { |
| | | replyWithUnreachablePeerMsg(msgEmitter, msg); |
| | | } |
| | | } |
| | | |
| | | private void replyWithMonitorMsg(RoutableMsg msg, ServerHandler msgEmitter) |
| | | { |
| | | /* |
| | | * If the request comes from a Directory Server we need to build the full |
| | | * list of all servers in the topology and send back a MonitorMsg with the |
| | | * full list of all the servers in the topology. |
| | | */ |
| | | if (msgEmitter.isDataServer()) |
| | | { |
| | | // Monitoring information requested by a DS |
| | | MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg( |
| | | msg.getDestination(), msg.getSenderID(), monitorData); |
| | | try |
| | | { |
| | | senderHandler.send(errMsg); |
| | | } catch (IOException ioe) |
| | | { |
| | | // TODO Handle error properly (sender timeout in addition) |
| | | /* |
| | | * An error happened trying to send an error msg to this server. |
| | | * Log an error and close the connection to this server. |
| | | */ |
| | | MessageBuilder mb2 = new MessageBuilder(); |
| | | mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString())); |
| | | mb2.append(stackTraceToSingleLineString(ioe)); |
| | | logError(mb2.toMessage()); |
| | | stopServer(senderHandler, false); |
| | | msgEmitter.send(monitorMsg); |
| | | } |
| | | } else |
| | | catch (IOException e) |
| | | { |
| | | // the connection was closed. |
| | | } |
| | | } |
| | | else |
| | | { |
| | | for (ServerHandler targetHandler : servers) |
| | | // Monitoring information requested by a RS |
| | | MonitorMsg monitorMsg = createLocalTopologyMonitorMsg( |
| | | msg.getDestination(), msg.getSenderID()); |
| | | |
| | | if (monitorMsg != null) |
| | | { |
| | | try |
| | | { |
| | | targetHandler.send(msg); |
| | | } catch (IOException ioe) |
| | | msgEmitter.send(monitorMsg); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | /* |
| | | * An error happened trying the send a routable message |
| | | * to its destination server. |
| | | * Send back an error to the originator of the message. |
| | | */ |
| | | MessageBuilder mb1 = new MessageBuilder(); |
| | | mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get( |
| | | this.baseDn, Integer.toString(msg.getDestination()))); |
| | | mb1.append(" unroutable message =" + msg.getClass().getSimpleName()); |
| | | mb1.append(" Details: " + ioe.getLocalizedMessage()); |
| | | ErrorMsg errMsg = new ErrorMsg( |
| | | msg.getSenderID(), mb1.toMessage()); |
| | | logError(mb1.toMessage()); |
| | | try |
| | | { |
| | | senderHandler.send(errMsg); |
| | | } catch (IOException ioe1) |
| | | { |
| | | // an error happened on the sender session trying to recover |
| | | // from an error on the receiver session. |
| | | // We don't have much solution left beside closing the sessions. |
| | | stopServer(senderHandler, false); |
| | | stopServer(targetHandler, false); |
| | | } |
| | | // TODO Handle error properly (sender timeout in addition) |
| | | // We log the error. The requestor will detect a timeout or |
| | | // any other failure on the connection. |
| | | logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(Integer.toString(msg |
| | | .getDestination()))); |
| | | } |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | private void replyWithUnroutableMsgType(ServerHandler msgEmitter, |
| | | RoutableMsg msg) |
| | | { |
| | | String msgClassname = msg.getClass().getCanonicalName(); |
| | | logError(NOTE_ERR_ROUTING_TO_SERVER.get(msgClassname)); |
| | | |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(NOTE_ERR_ROUTING_TO_SERVER.get(msgClassname)); |
| | | mb.append("serverID:").append(msg.getDestination()); |
| | | ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), mb.toMessage()); |
| | | try |
| | | { |
| | | msgEmitter.send(errMsg); |
| | | } |
| | | catch (IOException ignored) |
| | | { |
| | | // an error happened on the sender session trying to recover |
| | | // from an error on the receiver session. |
| | | // Not much more we can do at this point. |
| | | } |
| | | } |
| | | |
| | | private void replyWithUnreachablePeerMsg(ServerHandler msgEmitter, |
| | | RoutableMsg msg) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get( |
| | | this.baseDn, Integer.toString(msg.getDestination()))); |
| | | mb.append(" In Replication Server=").append( |
| | | this.localReplicationServer.getMonitorInstanceName()); |
| | | mb.append(" unroutable message =").append(msg.getClass().getSimpleName()); |
| | | mb.append(" Details:routing table is empty"); |
| | | final Message message = mb.toMessage(); |
| | | logError(message); |
| | | |
| | | ErrorMsg errMsg = new ErrorMsg(this.localReplicationServer.getServerId(), |
| | | msg.getSenderID(), message); |
| | | try |
| | | { |
| | | msgEmitter.send(errMsg); |
| | | } |
| | | catch (IOException ignored) |
| | | { |
| | | // TODO Handle error properly (sender timeout in addition) |
| | | /* |
| | | * An error happened trying to send an error msg to this server. |
| | | * Log an error and close the connection to this server. |
| | | */ |
| | | MessageBuilder mb2 = new MessageBuilder(); |
| | | mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString())); |
| | | mb2.append(stackTraceToSingleLineString(ignored)); |
| | | logError(mb2.toMessage()); |
| | | stopServer(msgEmitter, false); |
| | | } |
| | | } |
| | | |
| | | private void forwardMsgToAllServers(RoutableMsg msg, |
| | | List<ServerHandler> servers, ServerHandler msgEmitter) |
| | | { |
| | | for (ServerHandler targetHandler : servers) |
| | | { |
| | | try |
| | | { |
| | | targetHandler.send(msg); |
| | | } catch (IOException ioe) |
| | | { |
| | | /* |
| | | * An error happened trying to send a routable message to its |
| | | * destination server. |
| | | * Send back an error to the originator of the message. |
| | | */ |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get( |
| | | this.baseDn, Integer.toString(msg.getDestination()))); |
| | | mb.append(" unroutable message =" + msg.getClass().getSimpleName()); |
| | | mb.append(" Details: " + ioe.getLocalizedMessage()); |
| | | final Message message = mb.toMessage(); |
| | | logError(message); |
| | | |
| | | ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message); |
| | | try |
| | | { |
| | | msgEmitter.send(errMsg); |
| | | } catch (IOException ioe1) |
| | | { |
| | | // an error happened on the sender session trying to recover |
| | | // from an error on the receiver session. |
| | | // We don't have much solution left beside closing the sessions. |
| | | stopServer(msgEmitter, false); |
| | | stopServer(targetHandler, false); |
| | | } |
| | | // TODO Handle error properly (sender timeout in addition) |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new monitor message including monitoring information for the |
| | |
| | | public MonitorMsg createGlobalTopologyMonitorMsg( |
| | | int sender, int destination, MonitorData monitorData) |
| | | { |
| | | MonitorMsg returnMsg = |
| | | new MonitorMsg(sender, destination); |
| | | final MonitorMsg returnMsg = new MonitorMsg(sender, destination); |
| | | |
| | | returnMsg.setReplServerDbState(getDbServerState()); |
| | | |
| | | // Add the informations about the Replicas currently in |
| | | // the topology. |
| | | // Add the informations about the Replicas currently in the topology. |
| | | Iterator<Integer> it = monitorData.ldapIterator(); |
| | | while (it.hasNext()) |
| | | { |
| | |
| | | monitorData.getApproxFirstMissingDate(replicaId), true); |
| | | } |
| | | |
| | | // Add the information about the Replication Servers |
| | | // currently in the topology. |
| | | // Add the information about the RSs currently in the topology. |
| | | it = monitorData.rsIterator(); |
| | | while (it.hasNext()) |
| | | { |
| | |
| | | for (DataServerHandler lsh : this.directoryServers.values()) |
| | | { |
| | | monitorMsg.setServerState(lsh.getServerId(), |
| | | lsh.getServerState(), lsh.getApproxFirstMissingDate(), |
| | | true); |
| | | lsh.getServerState(), lsh.getApproxFirstMissingDate(), true); |
| | | } |
| | | |
| | | // Same for the connected RS |
| | | for (ReplicationServerHandler rsh : this.replicationServers.values()) |
| | | { |
| | | monitorMsg.setServerState(rsh.getServerId(), |
| | | rsh.getServerState(), rsh.getApproxFirstMissingDate(), |
| | | false); |
| | | rsh.getServerState(), rsh.getApproxFirstMissingDate(), false); |
| | | } |
| | | |
| | | // Populate the RS state in the msg from the DbState |
| | |
| | | |
| | | stopAllServers(true); |
| | | |
| | | stopDbHandlers(); |
| | | shutdownDbHandlers(); |
| | | } |
| | | |
| | | /** |
| | | * Stop the dbHandlers . |
| | | */ |
| | | private void stopDbHandlers() |
| | | /** Shutdown all the dbHandlers. */ |
| | | private void shutdownDbHandlers() |
| | | { |
| | | // Shutdown the dbHandlers |
| | | synchronized (sourceDbHandlers) |
| | | { |
| | | for (DbHandler dbHandler : sourceDbHandlers.values()) |
| | |
| | | |
| | | // Create info for the local RS |
| | | List<RSInfo> rsInfos = new ArrayList<RSInfo>(); |
| | | RSInfo localRSInfo = toRSInfo(replicationServer, generationId); |
| | | rsInfos.add(localRSInfo); |
| | | |
| | | rsInfos.add(toRSInfo(localReplicationServer, generationId)); |
| | | return new TopologyMsg(dsInfos, rsInfos); |
| | | } |
| | | |
| | |
| | | */ |
| | | public TopologyMsg createTopologyMsgForDS(int destDsId) |
| | | { |
| | | List<DSInfo> dsInfos = new ArrayList<DSInfo>(); |
| | | List<RSInfo> rsInfos = new ArrayList<RSInfo>(); |
| | | |
| | | // Go through every DSs (except recipient of msg) |
| | | List<DSInfo> dsInfos = new ArrayList<DSInfo>(); |
| | | for (DataServerHandler serverHandler : directoryServers.values()) |
| | | { |
| | | if (serverHandler.getServerId() == destDsId) |
| | |
| | | dsInfos.add(serverHandler.toDSInfo()); |
| | | } |
| | | |
| | | |
| | | List<RSInfo> rsInfos = new ArrayList<RSInfo>(); |
| | | // Add our own info (local RS) |
| | | RSInfo localRSInfo = toRSInfo(replicationServer, generationId); |
| | | rsInfos.add(localRSInfo); |
| | | rsInfos.add(toRSInfo(localReplicationServer, generationId)); |
| | | |
| | | // Go through every peer RSs (and get their connected DSs), also add info |
| | | // for RSs |
| | | for (ReplicationServerHandler serverHandler : replicationServers.values()) |
| | | { |
| | | // Put RS info |
| | | rsInfos.add(serverHandler.toRSInfo()); |
| | | |
| | | serverHandler.addDSInfos(dsInfos); |
| | |
| | | logError(mb.toMessage()); |
| | | } |
| | | } |
| | | stopDbHandlers(); |
| | | shutdownDbHandlers(); |
| | | } |
| | | try |
| | | { |
| | | replicationServer.clearGenerationId(baseDn); |
| | | localReplicationServer.clearGenerationId(baseDn); |
| | | } catch (Exception e) |
| | | { |
| | | // TODO: i18n |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | TRACER.debugInfo("In " |
| | | + this.localReplicationServer.getMonitorInstanceName() |
| | | + " baseDN=" + baseDn + " isDegraded serverId=" + serverId |
| | | + " given local generation Id=" + this.generationId); |
| | | } |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | TRACER.debugInfo("In " |
| | | + this.localReplicationServer.getMonitorInstanceName() |
| | | + " baseDN=" + baseDn + " Compute degradation of serverId=" |
| | | + serverId + " LS server generation Id=" + handler.getGenerationId()); |
| | | } |
| | |
| | | */ |
| | | public ReplicationServer getReplicationServer() |
| | | { |
| | | return replicationServer; |
| | | return localReplicationServer; |
| | | } |
| | | |
| | | /** |
| | |
| | | int serverId = rs.getServerId(); |
| | | |
| | | MonitorRequestMsg msg = new MonitorRequestMsg( |
| | | this.replicationServer.getServerId(), serverId); |
| | | this.localReplicationServer.getServerId(), serverId); |
| | | try |
| | | { |
| | | rs.send(msg); |
| | |
| | | // - from our own local db state |
| | | // - whatever they are directly or indirectly connected |
| | | ServerState dbServerState = getDbServerState(); |
| | | pendingMonitorData.setRSState(replicationServer.getServerId(), |
| | | pendingMonitorData.setRSState(localReplicationServer.getServerId(), |
| | | dbServerState); |
| | | for (int serverId : dbServerState) { |
| | | ChangeNumber storedCN = dbServerState.getChangeNumber(serverId); |
| | |
| | | while (rsidIterator.hasNext()) |
| | | { |
| | | int rsid = rsidIterator.next(); |
| | | if (rsid == replicationServer.getServerId()) |
| | | if (rsid == localReplicationServer.getServerId()) |
| | | { |
| | | // this is the latency of the remote RSi regarding the current RS |
| | | // let's update the fmd of my connected LS |
| | |
| | | if (statusAnalyzer == null) |
| | | { |
| | | int degradedStatusThreshold = |
| | | replicationServer.getDegradedStatusThreshold(); |
| | | localReplicationServer.getDegradedStatusThreshold(); |
| | | if (degradedStatusThreshold > 0) // 0 means no status analyzer |
| | | { |
| | | statusAnalyzer = new StatusAnalyzer(this, degradedStatusThreshold); |
| | |
| | | if (monitoringPublisher == null) |
| | | { |
| | | long period = |
| | | replicationServer.getMonitoringPublisherPeriod(); |
| | | localReplicationServer.getMonitoringPublisherPeriod(); |
| | | if (period > 0) // 0 means no monitoring publisher |
| | | { |
| | | monitoringPublisher = new MonitoringPublisher(this, period); |
| | |
| | | @Override |
| | | public String getMonitorInstanceName() |
| | | { |
| | | return "Replication server RS(" + replicationServer.getServerId() + ") " |
| | | + replicationServer.getServerURL() + ",cn=" |
| | | return "Replication server RS(" + localReplicationServer.getServerId() |
| | | + ") " + localReplicationServer.getServerURL() + ",cn=" |
| | | + baseDn.replace(',', '_').replace('=', '_') + ",cn=Replication"; |
| | | } |
| | | |
| | |
| | | // publish the server id and the port number. |
| | | List<Attribute> attributes = new ArrayList<Attribute>(); |
| | | attributes.add(Attributes.create("replication-server-id", |
| | | String.valueOf(replicationServer.getServerId()))); |
| | | String.valueOf(localReplicationServer.getServerId()))); |
| | | attributes.add(Attributes.create("replication-server-port", |
| | | String.valueOf(replicationServer.getReplicationPort()))); |
| | | String.valueOf(localReplicationServer.getReplicationPort()))); |
| | | |
| | | // Add all the base DNs that are known by this replication server. |
| | | attributes.add(Attributes.create("domain-name", baseDn)); |
| | |
| | | MonitorData md = getDomainMonitorData(); |
| | | |
| | | // Missing changes |
| | | long missingChanges = md.getMissingChangesRS(replicationServer |
| | | long missingChanges = md.getMissingChangesRS(localReplicationServer |
| | | .getServerId()); |
| | | attributes.add(Attributes.create("missing-changes", |
| | | String.valueOf(missingChanges))); |
| | |
| | | } |
| | | */ |
| | | |
| | | boolean serverIdConnected = false; |
| | | if (directoryServers.containsKey(serverId)) |
| | | { |
| | | serverIdConnected = true; |
| | | } |
| | | else |
| | | { |
| | | // not directly connected |
| | | for (ReplicationServerHandler rsh : replicationServers.values()) |
| | | { |
| | | if (rsh.isRemoteLDAPServer(serverId)) |
| | | { |
| | | serverIdConnected = true; |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | if (!serverIdConnected) |
| | | if (!isServerConnected(serverId)) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + "Replication Server " |
| | | + replicationServer.getReplicationPort() + " " + baseDn + " " |
| | | + replicationServer.getServerId() + " Server " + serverId |
| | | + localReplicationServer.getReplicationPort() + " " + baseDn + " " |
| | | + localReplicationServer.getServerId() + " Server " + serverId |
| | | + " is not considered for eligibility ... potentially down"); |
| | | } |
| | | continue; |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In Replication Server " |
| | | + replicationServer.getReplicationPort() + " " + baseDn + " " |
| | | + replicationServer.getServerId() |
| | | + localReplicationServer.getReplicationPort() + " " + baseDn + " " |
| | | + localReplicationServer.getServerId() |
| | | + " getEligibleCN() returns result =" + eligibleCN); |
| | | } |
| | | return eligibleCN; |
| | | } |
| | | |
| | | private boolean isServerConnected(int serverId) |
| | | { |
| | | if (directoryServers.containsKey(serverId)) |
| | | { |
| | | return true; |
| | | } |
| | | |
| | | // not directly connected |
| | | for (ReplicationServerHandler rsHandler : replicationServers.values()) |
| | | { |
| | | if (rsHandler.isRemoteLDAPServer(serverId)) |
| | | { |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Processes a ChangeTimeHeartbeatMsg received, by storing the CN (timestamp) |
| | |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | logError(ERR_CHANGELOG_ERROR_SENDING_MSG |
| | | .get("Replication Server " |
| | | + replicationServer.getReplicationPort() + " " |
| | | + baseDn + " " + replicationServer.getServerId())); |
| | | + localReplicationServer.getReplicationPort() + " " |
| | | + baseDn + " " + localReplicationServer.getServerId())); |
| | | stopServer(rsHandler, false); |
| | | } |
| | | } |