| | |
| | | // Push the message to the replication servers |
| | | if (sourceHandler.isDataServer()) |
| | | { |
| | | for (ReplicationServerHandler handler : connectedRSs.values()) |
| | | for (ReplicationServerHandler rsHandler : connectedRSs.values()) |
| | | { |
| | | /** |
| | | * Ignore updates to RS with bad gen id |
| | | * (no system managed status for a RS) |
| | | */ |
| | | if (isDifferentGenerationId(handler.getGenerationId())) |
| | | if (isDifferentGenerationId(rsHandler.getGenerationId())) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | |
| | | + localReplicationServer.getServerId() + " for dn " + baseDn |
| | | + ", update " + update.getChangeNumber() |
| | | + " will not be sent to replication server " |
| | | + handler.getServerId() + " with generation id " |
| | | + handler.getGenerationId() + " different from local " |
| | | + rsHandler.getServerId() + " with generation id " |
| | | + rsHandler.getGenerationId() + " different from local " |
| | | + "generation id " + generationId); |
| | | } |
| | | |
| | | continue; |
| | | } |
| | | |
| | | notAssuredUpdate = addUpdate(handler, update, notAssuredUpdate, |
| | | notAssuredUpdate = addUpdate(rsHandler, update, notAssuredUpdate, |
| | | assuredMessage, expectedServers); |
| | | } |
| | | } |
| | | |
| | | // Push the message to the LDAP servers |
| | | for (DataServerHandler handler : connectedDSs.values()) |
| | | for (DataServerHandler dsHandler : connectedDSs.values()) |
| | | { |
| | | // Don't forward the change to the server that just sent it |
| | | if (handler == sourceHandler) |
| | | if (dsHandler == sourceHandler) |
| | | { |
| | | continue; |
| | | } |
| | |
| | | * stop sending updates is interesting anyway. Not taking the RSD lock |
| | | * allows to have better performances in normal mode (most of the time). |
| | | */ |
| | | ServerStatus dsStatus = handler.getStatus(); |
| | | ServerStatus dsStatus = dsHandler.getStatus(); |
| | | if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS |
| | | || dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | | { |
| | |
| | | TRACER.debugInfo("In " + this + " for dn " + baseDn + ", update " |
| | | + update.getChangeNumber() |
| | | + " will not be sent to directory server " |
| | | + handler.getServerId() + " with generation id " |
| | | + handler.getGenerationId() + " different from local " |
| | | + dsHandler.getServerId() + " with generation id " |
| | | + dsHandler.getGenerationId() + " different from local " |
| | | + "generation id " + generationId); |
| | | } |
| | | if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | |
| | | TRACER.debugInfo("In RS " + localReplicationServer.getServerId() |
| | | + " for dn " + baseDn + ", update " + update.getChangeNumber() |
| | | + " will not be sent to directory server " |
| | | + handler.getServerId() + " as it is in full update"); |
| | | + dsHandler.getServerId() + " as it is in full update"); |
| | | } |
| | | } |
| | | |
| | | continue; |
| | | } |
| | | |
| | | notAssuredUpdate = addUpdate(handler, update, notAssuredUpdate, |
| | | notAssuredUpdate = addUpdate(dsHandler, update, notAssuredUpdate, |
| | | assuredMessage, expectedServers); |
| | | } |
| | | |
| | | // Push the message to the other subscribing handlers |
| | | for (MessageHandler handler : otherHandlers) { |
| | | handler.add(update); |
| | | for (MessageHandler mHandler : otherHandlers) { |
| | | mHandler.add(update); |
| | | } |
| | | } |
| | | |
| | |
| | | return true; |
| | | } |
| | | |
| | | private NotAssuredUpdateMsg addUpdate(ServerHandler handler, |
| | | private NotAssuredUpdateMsg addUpdate(ServerHandler sHandler, |
| | | UpdateMsg update, NotAssuredUpdateMsg notAssuredUpdate, |
| | | boolean assuredMessage, List<Integer> expectedServers) |
| | | throws UnsupportedEncodingException |
| | |
| | | { |
| | | // Assured mode: post an assured or not assured matching update |
| | | // message according to what has been computed for the destination server |
| | | if (expectedServers.contains(handler.getServerId())) |
| | | if (expectedServers.contains(sHandler.getServerId())) |
| | | { |
| | | handler.add(update); |
| | | sHandler.add(update); |
| | | } |
| | | else |
| | | { |
| | |
| | | { |
| | | notAssuredUpdate = new NotAssuredUpdateMsg(update); |
| | | } |
| | | handler.add(notAssuredUpdate); |
| | | sHandler.add(notAssuredUpdate); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | handler.add(update); |
| | | sHandler.add(update); |
| | | } |
| | | return notAssuredUpdate; |
| | | } |
| | |
| | | } |
| | | |
| | | // Look for DS eligible for assured |
| | | for (DataServerHandler handler : connectedDSs.values()) |
| | | for (DataServerHandler dsHandler : connectedDSs.values()) |
| | | { |
| | | // Don't forward the change to the server that just sent it |
| | | if (handler == sourceHandler) |
| | | if (dsHandler == sourceHandler) |
| | | { |
| | | continue; |
| | | } |
| | | if (handler.getGroupId() == groupId) |
| | | if (dsHandler.getGroupId() == groupId) |
| | | // No ack expected from a DS with different group id |
| | | { |
| | | ServerStatus serverStatus = handler.getStatus(); |
| | | ServerStatus serverStatus = dsHandler.getStatus(); |
| | | if (serverStatus == ServerStatus.NORMAL_STATUS) |
| | | { |
| | | expectedServers.add(handler.getServerId()); |
| | | expectedServers.add(dsHandler.getServerId()); |
| | | } else if (serverStatus == ServerStatus.DEGRADED_STATUS) { |
| | | // No ack expected from a DS with wrong status |
| | | wrongStatusServers.add(handler.getServerId()); |
| | | wrongStatusServers.add(dsHandler.getServerId()); |
| | | } |
| | | /* |
| | | * else |
| | |
| | | private void collectRSsEligibleForAssuredReplication(byte groupId, |
| | | List<Integer> expectedServers) |
| | | { |
| | | for (ReplicationServerHandler handler : connectedRSs.values()) |
| | | for (ReplicationServerHandler rsHandler : connectedRSs.values()) |
| | | { |
| | | if (handler.getGroupId() == groupId |
| | | if (rsHandler.getGroupId() == groupId |
| | | // No ack expected from a RS with different group id |
| | | && isSameGenerationId(handler.getGenerationId()) |
| | | && isSameGenerationId(rsHandler.getGenerationId()) |
| | | // No ack expected from a RS with bad gen id |
| | | ) |
| | | { |
| | | expectedServers.add(handler.getServerId()); |
| | | expectedServers.add(rsHandler.getServerId()); |
| | | } |
| | | } |
| | | } |
| | |
| | | */ |
| | | public void stopReplicationServers(Collection<String> replServerURLs) |
| | | { |
| | | for (ReplicationServerHandler handler : connectedRSs.values()) |
| | | for (ReplicationServerHandler rsHandler : connectedRSs.values()) |
| | | { |
| | | if (replServerURLs.contains(handler.getServerAddressURL())) |
| | | if (replServerURLs.contains(rsHandler.getServerAddressURL())) |
| | | { |
| | | stopServer(handler, false); |
| | | stopServer(rsHandler, false); |
| | | } |
| | | } |
| | | } |
| | |
| | | */ |
| | | public void stopAllServers(boolean shutdown) |
| | | { |
| | | // Close session with other replication servers |
| | | for (ReplicationServerHandler serverHandler : connectedRSs.values()) |
| | | for (ReplicationServerHandler rsHandler : connectedRSs.values()) |
| | | { |
| | | stopServer(serverHandler, shutdown); |
| | | stopServer(rsHandler, shutdown); |
| | | } |
| | | |
| | | // Close session with other LDAP servers |
| | | for (DataServerHandler serverHandler : connectedDSs.values()) |
| | | for (DataServerHandler dsHandler : connectedDSs.values()) |
| | | { |
| | | stopServer(serverHandler, shutdown); |
| | | stopServer(dsHandler, shutdown); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Checks whether it is already connected to a DS with same id. |
| | | * |
| | | * @param handler |
| | | * @param dsHandler |
| | | * the DS we want to check |
| | | * @return true if this DS is already connected to the current server |
| | | */ |
| | | public boolean isAlreadyConnectedToDS(DataServerHandler handler) |
| | | public boolean isAlreadyConnectedToDS(DataServerHandler dsHandler) |
| | | { |
| | | if (connectedDSs.containsKey(handler.getServerId())) |
| | | if (connectedDSs.containsKey(dsHandler.getServerId())) |
| | | { |
| | | // looks like two connected LDAP servers have the same serverId |
| | | Message message = ERR_DUPLICATE_SERVER_ID.get( |
| | | localReplicationServer.getMonitorInstanceName(), |
| | | connectedDSs.get(handler.getServerId()).toString(), |
| | | handler.toString(), handler.getServerId()); |
| | | connectedDSs.get(dsHandler.getServerId()).toString(), |
| | | dsHandler.toString(), dsHandler.getServerId()); |
| | | logError(message); |
| | | return true; |
| | | } |
| | |
| | | /** |
| | | * Stop operations with a given server. |
| | | * |
| | | * @param handler the server for which we want to stop operations. |
| | | * @param sHandler the server for which we want to stop operations. |
| | | * @param shutdown A boolean indicating if the stop is due to a |
| | | * shutdown condition. |
| | | */ |
| | | public void stopServer(ServerHandler handler, boolean shutdown) |
| | | public void stopServer(ServerHandler sHandler, boolean shutdown) |
| | | { |
| | | // TODO JNR merge with stopServer(MessageHandler) |
| | | if (debugEnabled()) |
| | |
| | | TRACER.debugInfo("In " |
| | | + this.localReplicationServer.getMonitorInstanceName() |
| | | + " domain=" + this + " stopServer() on the server handler " |
| | | + handler.getMonitorInstanceName()); |
| | | + sHandler.getMonitorInstanceName()); |
| | | } |
| | | /* |
| | | * We must prevent deadlock on replication server domain lock, when for |
| | |
| | | * the handler. So use a thread safe flag to know if the job must be done |
| | | * or not (is already being processed or not). |
| | | */ |
| | | if (!handler.engageShutdown()) |
| | | if (!sHandler.engageShutdown()) |
| | | // Only do this once (prevent other thread to enter here again) |
| | | { |
| | | if (!shutdown) |
| | |
| | | { |
| | | TRACER.debugInfo("In " |
| | | + localReplicationServer.getMonitorInstanceName() |
| | | + " remote server " + handler.getMonitorInstanceName() |
| | | + " remote server " + sHandler.getMonitorInstanceName() |
| | | + " is the last RS/DS to be stopped:" |
| | | + " stopping monitoring publisher"); |
| | | } |
| | | stopMonitoringPublisher(); |
| | | } |
| | | |
| | | if (connectedRSs.containsKey(handler.getServerId())) |
| | | if (connectedRSs.containsKey(sHandler.getServerId())) |
| | | { |
| | | unregisterServerHandler(handler, shutdown, false); |
| | | } else if (connectedDSs.containsKey(handler.getServerId())) |
| | | unregisterServerHandler(sHandler, shutdown, false); |
| | | } else if (connectedDSs.containsKey(sHandler.getServerId())) |
| | | { |
| | | // If this is the last DS for the domain, |
| | | // shutdown the status analyzer |
| | |
| | | { |
| | | TRACER.debugInfo("In " |
| | | + localReplicationServer.getMonitorInstanceName() |
| | | + " remote server " + handler.getMonitorInstanceName() |
| | | + " remote server " + sHandler.getMonitorInstanceName() |
| | | + " is the last DS to be stopped: stopping status analyzer"); |
| | | } |
| | | stopStatusAnalyzer(); |
| | | } |
| | | unregisterServerHandler(handler, shutdown, true); |
| | | } else if (otherHandlers.contains(handler)) |
| | | unregisterServerHandler(sHandler, shutdown, true); |
| | | } else if (otherHandlers.contains(sHandler)) |
| | | { |
| | | unregisterOtherHandler(handler); |
| | | unregisterOtherHandler(sHandler); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | |
| | | } |
| | | } |
| | | |
| | | private void unregisterOtherHandler(MessageHandler handler) |
| | | private void unregisterOtherHandler(MessageHandler mHandler) |
| | | { |
| | | unRegisterHandler(handler); |
| | | handler.shutdown(); |
| | | unRegisterHandler(mHandler); |
| | | mHandler.shutdown(); |
| | | } |
| | | |
| | | private void unregisterServerHandler(ServerHandler handler, boolean shutdown, |
| | | private void unregisterServerHandler(ServerHandler sHandler, boolean shutdown, |
| | | boolean isDirectoryServer) |
| | | { |
| | | unregisterServerHandler(handler); |
| | | handler.shutdown(); |
| | | unregisterServerHandler(sHandler); |
| | | sHandler.shutdown(); |
| | | |
| | | // Check if generation id has to be reset |
| | | mayResetGenerationId(); |
| | |
| | | { |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | buildAndSendTopoInfoToRSs(); |
| | | sendTopoInfoToAllRSs(); |
| | | } |
| | | // Warn our DSs that a RS or DS has quit (does not use this |
| | | // handler as already removed from list) |
| | | buildAndSendTopoInfoToDSs(null); |
| | | sendTopoInfoToAllDSsExcept(null); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Stop the handler. |
| | | * @param handler The handler to stop. |
| | | * @param mHandler The handler to stop. |
| | | */ |
| | | public void stopServer(MessageHandler handler) |
| | | public void stopServer(MessageHandler mHandler) |
| | | { |
| | | // TODO JNR merge with stopServer(ServerHandler, boolean) |
| | | if (debugEnabled()) |
| | |
| | | TRACER.debugInfo("In " |
| | | + this.localReplicationServer.getMonitorInstanceName() |
| | | + " domain=" + this + " stopServer() on the message handler " |
| | | + handler.getMonitorInstanceName()); |
| | | + mHandler.getMonitorInstanceName()); |
| | | } |
| | | /* |
| | | * We must prevent deadlock on replication server domain lock, when for |
| | |
| | | * the handler. So use a thread safe flag to know if the job must be done |
| | | * or not (is already being processed or not). |
| | | */ |
| | | if (!handler.engageShutdown()) |
| | | if (!mHandler.engageShutdown()) |
| | | // Only do this once (prevent other thread to enter here again) |
| | | { |
| | | try |
| | |
| | | |
| | | try |
| | | { |
| | | if (otherHandlers.contains(handler)) |
| | | if (otherHandlers.contains(mHandler)) |
| | | { |
| | | unregisterOtherHandler(handler); |
| | | unregisterOtherHandler(mHandler); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | |
| | | /** |
| | | * Unregister this handler from the list of handlers registered to this |
| | | * domain. |
| | | * @param handler the provided handler to unregister. |
| | | * @param sHandler the provided handler to unregister. |
| | | */ |
| | | private void unregisterServerHandler(ServerHandler handler) |
| | | private void unregisterServerHandler(ServerHandler sHandler) |
| | | { |
| | | if (handler.isReplicationServer()) |
| | | if (sHandler.isReplicationServer()) |
| | | { |
| | | connectedRSs.remove(handler.getServerId()); |
| | | connectedRSs.remove(sHandler.getServerId()); |
| | | } |
| | | else |
| | | { |
| | | connectedDSs.remove(handler.getServerId()); |
| | | connectedDSs.remove(sHandler.getServerId()); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | private void mayResetGenerationId() |
| | | { |
| | | String prefix = |
| | | "In RS " + this.localReplicationServer.getMonitorInstanceName() |
| | | + " for " + baseDn + " "; |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RS " |
| | | + this.localReplicationServer.getMonitorInstanceName() |
| | | + " for " + baseDn + " mayResetGenerationId generationIdSavedStatus=" |
| | | TRACER.debugInfo(prefix + "mayResetGenerationId generationIdSavedStatus=" |
| | | + generationIdSavedStatus); |
| | | } |
| | | |
| | | // If there is no more any LDAP server connected to this domain in the |
| | | // topology and the generationId has never been saved, then we can reset |
| | | // it and the next LDAP server to connect will become the new reference. |
| | | boolean lDAPServersConnectedInTheTopology = false; |
| | | boolean ldapServersConnectedInTheTopology = false; |
| | | if (connectedDSs.isEmpty()) |
| | | { |
| | | for (ReplicationServerHandler rsh : connectedRSs.values()) |
| | | for (ReplicationServerHandler rsHandler : connectedRSs.values()) |
| | | { |
| | | if (generationId != rsh.getGenerationId()) |
| | | if (generationId != rsHandler.getGenerationId()) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RS " |
| | | + this.localReplicationServer.getMonitorInstanceName() + " for " |
| | | + baseDn + " " + " mayResetGenerationId skip RS" |
| | | + rsh.getMonitorInstanceName() + " that has different genId"); |
| | | TRACER.debugInfo(prefix + "mayResetGenerationId skip RS " |
| | | + rsHandler.getMonitorInstanceName() |
| | | + " that has different genId"); |
| | | } |
| | | } else if (rsh.hasRemoteLDAPServers()) |
| | | } |
| | | else if (rsHandler.hasRemoteLDAPServers()) |
| | | { |
| | | lDAPServersConnectedInTheTopology = true; |
| | | ldapServersConnectedInTheTopology = true; |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RS " |
| | | + this.localReplicationServer.getMonitorInstanceName() |
| | | + " for "+ baseDn + " mayResetGenerationId RS" |
| | | + rsh.getMonitorInstanceName() |
| | | + " has servers connected to it" |
| | | + " - will not reset generationId"); |
| | | } |
| | | break; |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(prefix + "mayResetGenerationId RS " |
| | | + rsHandler.getMonitorInstanceName() |
| | | + " has ldap servers connected to it" |
| | | + " - will not reset generationId"); |
| | | } |
| | | break; |
| | | } |
| | | } |
| | | } else |
| | | } |
| | | else |
| | | { |
| | | lDAPServersConnectedInTheTopology = true; |
| | | ldapServersConnectedInTheTopology = true; |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RS " |
| | | + this.localReplicationServer.getMonitorInstanceName() + " for " |
| | | + baseDn + " " |
| | | + " has servers connected to it - will not reset generationId"); |
| | | TRACER.debugInfo(prefix + "has ldap servers connected to it" |
| | | + " - will not reset generationId"); |
| | | } |
| | | } |
| | | |
| | | if (!lDAPServersConnectedInTheTopology && !this.generationIdSavedStatus |
| | | if (!ldapServersConnectedInTheTopology |
| | | && !this.generationIdSavedStatus |
| | | && generationId != -1) |
| | | { |
| | | changeGenerationId(-1, false); |
| | |
| | | /** |
| | | * Checks whether a remote RS is already connected to this hosting RS. |
| | | * |
| | | * @param handler |
| | | * @param rsHandler |
| | | * The handler for the remote RS. |
| | | * @return flag specifying whether the remote RS is already connected. |
| | | * @throws DirectoryException |
| | | * when a problem occurs. |
| | | */ |
| | | public boolean isAlreadyConnectedToRS(ReplicationServerHandler handler) |
| | | public boolean isAlreadyConnectedToRS(ReplicationServerHandler rsHandler) |
| | | throws DirectoryException |
| | | { |
| | | ReplicationServerHandler oldHandler = |
| | | connectedRSs.get(handler.getServerId()); |
| | | if (oldHandler == null) |
| | | ReplicationServerHandler oldRsHandler = |
| | | connectedRSs.get(rsHandler.getServerId()); |
| | | if (oldRsHandler == null) |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | if (oldHandler.getServerAddressURL().equals(handler.getServerAddressURL())) |
| | | if (oldRsHandler.getServerAddressURL().equals( |
| | | rsHandler.getServerAddressURL())) |
| | | { |
| | | // this is the same server, this means that our ServerStart messages |
| | | // have been sent at about the same time and 2 connections |
| | |
| | | // log an error message and drop this connection. |
| | | Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get( |
| | | localReplicationServer.getMonitorInstanceName(), |
| | | oldHandler.getServerAddressURL(), handler.getServerAddressURL(), |
| | | handler.getServerId()); |
| | | oldRsHandler.getServerAddressURL(), rsHandler.getServerAddressURL(), |
| | | rsHandler.getServerId()); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | |
| | |
| | | * This call is blocking when no update is available or when dependencies |
| | | * do not allow to send the next available change |
| | | * |
| | | * @param handler The server handler for the target directory server. |
| | | * @param sHandler The server handler for the target directory server. |
| | | * |
| | | * @return the update that must be forwarded |
| | | */ |
| | | public UpdateMsg take(ServerHandler handler) |
| | | public UpdateMsg take(ServerHandler sHandler) |
| | | { |
| | | /* |
| | | * Get the balanced tree that we use to sort the changes to be |
| | |
| | | * So this methods simply need to check that dependencies are OK |
| | | * and update this replicaId RUV |
| | | */ |
| | | return handler.take(); |
| | | return sHandler.take(); |
| | | } |
| | | |
| | | /** |
| | |
| | | public ReplicationIterator getChangelogIterator(int serverId, |
| | | ChangeNumber startAfterCN) |
| | | { |
| | | DbHandler handler = sourceDbHandlers.get(serverId); |
| | | if (handler == null) |
| | | DbHandler dbHandler = sourceDbHandlers.get(serverId); |
| | | if (dbHandler == null) |
| | | { |
| | | return null; |
| | | } |
| | |
| | | ReplicationIterator it; |
| | | try |
| | | { |
| | | it = handler.generateIterator(startAfterCN); |
| | | it = dbHandler.generateIterator(startAfterCN); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | */ |
| | | public long getCount(int serverId, ChangeNumber from, ChangeNumber to) |
| | | { |
| | | DbHandler handler = sourceDbHandlers.get(serverId); |
| | | if (handler != null) |
| | | DbHandler dbHandler = sourceDbHandlers.get(serverId); |
| | | if (dbHandler != null) |
| | | { |
| | | return handler.getCount(from, to); |
| | | return dbHandler.getCount(from, to); |
| | | } |
| | | return 0; |
| | | } |
| | |
| | | // from the states stored in the serverHandler. |
| | | // - the server state |
| | | // - the older missing change |
| | | for (DataServerHandler lsh : this.connectedDSs.values()) |
| | | for (DataServerHandler dsHandler : this.connectedDSs.values()) |
| | | { |
| | | monitorMsg.setServerState(lsh.getServerId(), |
| | | lsh.getServerState(), lsh.getApproxFirstMissingDate(), true); |
| | | monitorMsg.setServerState(dsHandler.getServerId(), dsHandler |
| | | .getServerState(), dsHandler.getApproxFirstMissingDate(), true); |
| | | } |
| | | |
| | | // Same for the connected RS |
| | | for (ReplicationServerHandler rsh : this.connectedRSs.values()) |
| | | for (ReplicationServerHandler rsHandler : this.connectedRSs.values()) |
| | | { |
| | | monitorMsg.setServerState(rsh.getServerId(), |
| | | rsh.getServerState(), rsh.getApproxFirstMissingDate(), false); |
| | | monitorMsg.setServerState(rsHandler.getServerId(), rsHandler |
| | | .getServerState(), rsHandler.getApproxFirstMissingDate(), false); |
| | | } |
| | | |
| | | // Populate the RS state in the msg from the DbState |
| | |
| | | } |
| | | |
| | | /** |
| | | * Send a TopologyMsg to all the connected directory servers in order to |
| | | * let them know the topology (every known DSs and RSs). |
| | | * @param notThisOne If not null, the topology message will not be sent to |
| | | * this passed server. |
| | | * Send a TopologyMsg to all the connected directory servers in order to let |
| | | * them know the topology (every known DSs and RSs). |
| | | * |
| | | * @param notThisOne |
| | | * If not null, the topology message will not be sent to this DS. |
| | | */ |
| | | public void buildAndSendTopoInfoToDSs(ServerHandler notThisOne) |
| | | private void sendTopoInfoToAllDSsExcept(DataServerHandler notThisOne) |
| | | { |
| | | for (DataServerHandler handler : connectedDSs.values()) |
| | | for (DataServerHandler dsHandler : connectedDSs.values()) |
| | | { |
| | | if (notThisOne == null || handler != notThisOne) |
| | | // All except passed one |
| | | if (dsHandler != notThisOne) |
| | | // All except the supplied one |
| | | { |
| | | for (int i=1; i<=2; i++) |
| | | { |
| | | if (!handler.shuttingDown() |
| | | && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS) |
| | | if (!dsHandler.shuttingDown() |
| | | && dsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS) |
| | | { |
| | | TopologyMsg topoMsg = createTopologyMsgForDS(handler.getServerId()); |
| | | TopologyMsg topoMsg = |
| | | createTopologyMsgForDS(dsHandler.getServerId()); |
| | | try |
| | | { |
| | | handler.sendTopoInfo(topoMsg); |
| | | dsHandler.sendTopoInfo(topoMsg); |
| | | break; |
| | | } |
| | | catch (IOException e) |
| | |
| | | { |
| | | Message message = |
| | | ERR_EXCEPTION_SENDING_TOPO_INFO |
| | | .get(baseDn, "directory", Integer.toString(handler |
| | | .get(baseDn, "directory", Integer.toString(dsHandler |
| | | .getServerId()), e.getMessage()); |
| | | logError(message); |
| | | } |
| | |
| | | * Send a TopologyMsg to all the connected replication servers |
| | | * in order to let them know our connected LDAP servers. |
| | | */ |
| | | public void buildAndSendTopoInfoToRSs() |
| | | private void sendTopoInfoToAllRSs() |
| | | { |
| | | TopologyMsg topoMsg = createTopologyMsgForRS(); |
| | | for (ReplicationServerHandler handler : connectedRSs.values()) |
| | | for (ReplicationServerHandler rsHandler : connectedRSs.values()) |
| | | { |
| | | for (int i=1; i<=2; i++) |
| | | { |
| | | if (!handler.shuttingDown() |
| | | && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS) |
| | | if (!rsHandler.shuttingDown() |
| | | && rsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS) |
| | | { |
| | | try |
| | | { |
| | | handler.sendTopoInfo(topoMsg); |
| | | rsHandler.sendTopoInfo(topoMsg); |
| | | break; |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | if (i == 2) |
| | | { |
| | | Message message = |
| | | ERR_EXCEPTION_SENDING_TOPO_INFO.get(baseDn, "replication", |
| | | Integer.toString(handler.getServerId()), e.getMessage()); |
| | | Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get( |
| | | baseDn, "replication", |
| | | Integer.toString(rsHandler.getServerId()), e.getMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | |
| | | public TopologyMsg createTopologyMsgForRS() |
| | | { |
| | | List<DSInfo> dsInfos = new ArrayList<DSInfo>(); |
| | | |
| | | // Go through every DSs |
| | | for (DataServerHandler serverHandler : connectedDSs.values()) |
| | | for (DataServerHandler dsHandler : connectedDSs.values()) |
| | | { |
| | | dsInfos.add(serverHandler.toDSInfo()); |
| | | dsInfos.add(dsHandler.toDSInfo()); |
| | | } |
| | | |
| | | // Create info for the local RS |
| | | List<RSInfo> rsInfos = new ArrayList<RSInfo>(); |
| | | rsInfos.add(toRSInfo(localReplicationServer, generationId)); |
| | | |
| | | return new TopologyMsg(dsInfos, rsInfos); |
| | | } |
| | | |
| | |
| | | { |
| | | // Go through every DSs (except recipient of msg) |
| | | List<DSInfo> dsInfos = new ArrayList<DSInfo>(); |
| | | for (DataServerHandler serverHandler : connectedDSs.values()) |
| | | for (DataServerHandler dsHandler : connectedDSs.values()) |
| | | { |
| | | if (serverHandler.getServerId() == destDsId) |
| | | if (dsHandler.getServerId() == destDsId) |
| | | { |
| | | continue; |
| | | } |
| | | dsInfos.add(serverHandler.toDSInfo()); |
| | | dsInfos.add(dsHandler.toDSInfo()); |
| | | } |
| | | |
| | | |
| | |
| | | |
| | | // Go through every peer RSs (and get their connected DSs), also add info |
| | | // for RSs |
| | | for (ReplicationServerHandler serverHandler : connectedRSs.values()) |
| | | for (ReplicationServerHandler rsHandler : connectedRSs.values()) |
| | | { |
| | | rsInfos.add(serverHandler.toRSInfo()); |
| | | rsInfos.add(rsHandler.toRSInfo()); |
| | | |
| | | serverHandler.addDSInfos(dsInfos); |
| | | rsHandler.addDSInfos(dsInfos); |
| | | } |
| | | |
| | | return new TopologyMsg(dsInfos, rsInfos); |
| | |
| | | // (consecutive to reset gen id message), we prefer advertising once for |
| | | // all after changes (less packet sent), here at the end of the reset msg |
| | | // treatment. |
| | | buildAndSendTopoInfoToDSs(null); |
| | | buildAndSendTopoInfoToRSs(); |
| | | sendTopoInfoToAll(); |
| | | |
| | | Message message = NOTE_RESET_GENERATION_ID.get(baseDn, newGenId); |
| | | logError(message); |
| | | logError(NOTE_RESET_GENERATION_ID.get(baseDn, newGenId)); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | |
| | | return; |
| | | } |
| | | |
| | | // Update every peers (RS/DS) with topology changes |
| | | buildAndSendTopoInfoToDSs(senderHandler); |
| | | buildAndSendTopoInfoToRSs(); |
| | | sendTopoInfoToAllExcept(senderHandler); |
| | | |
| | | Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get( |
| | | senderHandler.getServerId(), baseDn, newStatus.toString()); |
| | |
| | | /** |
| | | * Change the status of a directory server according to the event generated |
| | | * from the status analyzer. |
| | | * @param serverHandler The handler of the directory server to update |
| | | * @param dsHandler The handler of the directory server to update |
| | | * @param event The event to be used for new status computation |
| | | * @return True if we have been interrupted (must stop), false otherwise |
| | | */ |
| | | public boolean changeStatusFromStatusAnalyzer( |
| | | DataServerHandler serverHandler, StatusMachineEvent event) |
| | | public boolean changeStatus(DataServerHandler dsHandler, |
| | | StatusMachineEvent event) |
| | | { |
| | | try |
| | | { |
| | |
| | | TRACER.debugInfo("Status analyzer for domain " + baseDn |
| | | + " has been interrupted when" |
| | | + " trying to acquire domain lock for changing the status of DS " |
| | | + serverHandler.getServerId()); |
| | | + dsHandler.getServerId()); |
| | | } |
| | | return true; |
| | | } |
| | |
| | | try |
| | | { |
| | | ServerStatus newStatus = ServerStatus.INVALID_STATUS; |
| | | ServerStatus oldStatus = serverHandler.getStatus(); |
| | | ServerStatus oldStatus = dsHandler.getStatus(); |
| | | try |
| | | { |
| | | newStatus = serverHandler.changeStatusFromStatusAnalyzer(event); |
| | | newStatus = dsHandler.changeStatus(event); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER |
| | | .get(baseDn, |
| | | Integer.toString(serverHandler.getServerId()), |
| | | Integer.toString(dsHandler.getServerId()), |
| | | e.getMessage())); |
| | | } |
| | | |
| | |
| | | return false; |
| | | } |
| | | |
| | | // Update every peers (RS/DS) with topology changes |
| | | buildAndSendTopoInfoToDSs(serverHandler); |
| | | buildAndSendTopoInfoToRSs(); |
| | | sendTopoInfoToAllExcept(dsHandler); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Update every peers (RS/DS) with topology changes. |
| | | */ |
| | | public void sendTopoInfoToAll() |
| | | { |
| | | sendTopoInfoToAllExcept(null); |
| | | } |
| | | |
| | | /** |
| | | * Update every peers (RS/DS) with topology changes but one DS. |
| | | * |
| | | * @param dsHandler |
| | | * if not null, the topology message will not be sent to this DS |
| | | */ |
| | | private void sendTopoInfoToAllExcept(DataServerHandler dsHandler) |
| | | { |
| | | sendTopoInfoToAllDSsExcept(dsHandler); |
| | | sendTopoInfoToAllRSs(); |
| | | } |
| | | |
| | | /** |
| | | * Clears the Db associated with that domain. |
| | | */ |
| | | public void clearDbs() |
| | |
| | | + " given local generation Id=" + this.generationId); |
| | | } |
| | | |
| | | ServerHandler handler = connectedRSs.get(serverId); |
| | | if (handler == null) |
| | | ServerHandler sHandler = connectedRSs.get(serverId); |
| | | if (sHandler == null) |
| | | { |
| | | handler = connectedDSs.get(serverId); |
| | | if (handler == null) |
| | | sHandler = connectedDSs.get(serverId); |
| | | if (sHandler == null) |
| | | { |
| | | return false; |
| | | } |
| | |
| | | { |
| | | TRACER.debugInfo("In " |
| | | + this.localReplicationServer.getMonitorInstanceName() |
| | | + " baseDN=" + baseDn + " Compute degradation of serverId=" |
| | | + serverId + " LS server generation Id=" + handler.getGenerationId()); |
| | | + " baseDN=" + baseDn + " Compute degradation of serverId=" + serverId |
| | | + " LS server generation Id=" + sHandler.getGenerationId()); |
| | | } |
| | | return handler.getGenerationId() != this.generationId; |
| | | return sHandler.getGenerationId() != this.generationId; |
| | | } |
| | | |
| | | /** |
| | | * Process topology information received from a peer RS. |
| | | * @param topoMsg The just received topo message from remote RS |
| | | * @param handler The handler that received the message. |
| | | * @param rsHandler The handler that received the message. |
| | | * @param allowResetGenId True for allowing to reset the generation id ( |
| | | * when called after initial handshake) |
| | | * @throws IOException If an error occurred. |
| | | * @throws DirectoryException If an error occurred. |
| | | */ |
| | | public void receiveTopoInfoFromRS(TopologyMsg topoMsg, |
| | | ReplicationServerHandler handler, |
| | | boolean allowResetGenId) |
| | | throws IOException, DirectoryException |
| | | ReplicationServerHandler rsHandler, boolean allowResetGenId) |
| | | throws IOException, DirectoryException |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RS " + getLocalRSServerId() |
| | | + " Receiving TopologyMsg from " + handler.getServerId() |
| | | + " Receiving TopologyMsg from " + rsHandler.getServerId() |
| | | + " for baseDn " + baseDn + ":\n" + topoMsg); |
| | | } |
| | | |
| | |
| | | try |
| | | { |
| | | // Store DS connected to remote RS & update information about the peer RS |
| | | handler.processTopoInfoFromRS(topoMsg); |
| | | rsHandler.processTopoInfoFromRS(topoMsg); |
| | | |
| | | // Handle generation id |
| | | if (allowResetGenId) |
| | | { |
| | | // Check if generation id has to be reseted |
| | | // Check if generation id has to be reset |
| | | mayResetGenerationId(); |
| | | if (generationId < 0) |
| | | { |
| | | generationId = handler.getGenerationId(); |
| | | generationId = rsHandler.getGenerationId(); |
| | | } |
| | | } |
| | | |
| | | if (isDifferentGenerationId(handler.getGenerationId())) |
| | | if (isDifferentGenerationId(rsHandler.getGenerationId())) |
| | | { |
| | | Message message = WARN_BAD_GENERATION_ID_FROM_RS.get( |
| | | handler.getServerId(), handler.session.getReadableRemoteAddress(), |
| | | handler.getGenerationId(), |
| | | rsHandler.getServerId(), |
| | | rsHandler.session.getReadableRemoteAddress(), |
| | | rsHandler.getGenerationId(), |
| | | baseDn, getLocalRSServerId(), generationId); |
| | | logError(message); |
| | | |
| | | ErrorMsg errorMsg = |
| | | new ErrorMsg(getLocalRSServerId(), handler.getServerId(), message); |
| | | handler.send(errorMsg); |
| | | ErrorMsg errorMsg = new ErrorMsg(getLocalRSServerId(), |
| | | rsHandler.getServerId(), message); |
| | | rsHandler.send(errorMsg); |
| | | } |
| | | |
| | | /* |
| | | * Sends the currently known topology information to every connected |
| | | * DS we have. |
| | | */ |
| | | buildAndSendTopoInfoToDSs(null); |
| | | sendTopoInfoToAllDSsExcept(null); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | |
| | | */ |
| | | public void setPurgeDelay(long delay) |
| | | { |
| | | for (DbHandler handler : sourceDbHandlers.values()) |
| | | for (DbHandler dbHandler : sourceDbHandlers.values()) |
| | | { |
| | | handler.setPurgeDelay(delay); |
| | | dbHandler.setPurgeDelay(delay); |
| | | } |
| | | } |
| | | |
| | |
| | | /** |
| | | * Starts the status analyzer for the domain if not already started. |
| | | */ |
| | | public void startStatusAnalyzer() |
| | | private void startStatusAnalyzer() |
| | | { |
| | | int degradedStatusThreshold = |
| | | localReplicationServer.getDegradedStatusThreshold(); |
| | |
| | | /** |
| | | * Starts the monitoring publisher for the domain if not already started. |
| | | */ |
| | | public void startMonitoringPublisher() |
| | | private void startMonitoringPublisher() |
| | | { |
| | | long period = localReplicationServer.getMonitoringPublisherPeriod(); |
| | | if (period > 0) // 0 means no monitoring publisher |
| | |
| | | |
| | | /** |
| | | * Register in the domain an handler that subscribes to changes. |
| | | * @param handler the provided subscribing handler. |
| | | * @param mHandler the provided subscribing handler. |
| | | */ |
| | | public void registerHandler(MessageHandler handler) |
| | | public void registerHandler(MessageHandler mHandler) |
| | | { |
| | | this.otherHandlers.add(handler); |
| | | this.otherHandlers.add(mHandler); |
| | | } |
| | | |
| | | /** |
| | | * Unregister from the domain an handler. |
| | | * @param handler the provided unsubscribing handler. |
| | | * @param mHandler the provided unsubscribing handler. |
| | | * @return Whether this handler has been unregistered with success. |
| | | */ |
| | | public boolean unRegisterHandler(MessageHandler handler) |
| | | public boolean unRegisterHandler(MessageHandler mHandler) |
| | | { |
| | | return this.otherHandlers.remove(handler); |
| | | return this.otherHandlers.remove(mHandler); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | saThread.setDegradedStatusThreshold(degradedStatusThreshold); |
| | | } |
| | | else if (getConnectedDSs().size() > 0) |
| | | else if (connectedDSs.size() > 0) |
| | | { |
| | | // Requested to start analyzers with provided threshold value |
| | | startStatusAnalyzer(); |
| | |
| | | { |
| | | mpThread.setPeriod(period); |
| | | } |
| | | else if (getConnectedDSs().size() > 0 || getConnectedRSs().size() > 0) |
| | | else if (connectedDSs.size() > 0 || connectedRSs.size() > 0) |
| | | { |
| | | // Requested to start monitoring publishers with provided period value |
| | | startMonitoringPublisher(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Registers a DS handler into this domain and notifies the domain about the |
| | | * new DS. |
| | | * |
| | | * @param dsHandler |
| | | * The Directory Server Handler to register |
| | | */ |
| | | public void register(DataServerHandler dsHandler) |
| | | { |
| | | startStatusAnalyzer(); |
| | | startMonitoringPublisher(); |
| | | |
| | | // connected with new DS: store handler. |
| | | connectedDSs.put(dsHandler.getServerId(), dsHandler); |
| | | |
| | | // Tell peer RSs and DSs a new DS just connected to us |
| | | // No need to re-send TopologyMsg to this just new DS |
| | | sendTopoInfoToAllExcept(dsHandler); |
| | | } |
| | | |
| | | /** |
| | | * Registers the RS handler into this domain and notifies the domain. |
| | | * |
| | | * @param rsHandler |
| | | * The Replication Server Handler to register |
| | | */ |
| | | public void register(ReplicationServerHandler rsHandler) |
| | | { |
| | | startMonitoringPublisher(); |
| | | |
| | | // connected with new RS (either outgoing or incoming |
| | | // connection): store handler. |
| | | connectedRSs.put(rsHandler.getServerId(), rsHandler); |
| | | } |
| | | } |