| | |
| | | * we are currently publishing the first update in the balanced tree is the |
| | | * next change that we must push to this particular server. |
| | | */ |
| | | private final Map<Integer, DataServerHandler> directoryServers = |
| | | private final Map<Integer, DataServerHandler> connectedDSs = |
| | | new ConcurrentHashMap<Integer, DataServerHandler>(); |
| | | |
| | | /** |
| | |
| | | * in the balanced tree is the next change that we must push to this |
| | | * particular server. |
| | | */ |
| | | private final Map<Integer, ReplicationServerHandler> replicationServers = |
| | | private final Map<Integer, ReplicationServerHandler> connectedRSs = |
| | | new ConcurrentHashMap<Integer, ReplicationServerHandler>(); |
| | | |
| | | private final Queue<MessageHandler> otherHandlers = |
| | |
| | | */ |
| | | NotAssuredUpdateMsg notAssuredUpdate = null; |
| | | |
| | | /* |
| | | * Push the message to the replication servers |
| | | */ |
| | | // Push the message to the replication servers |
| | | if (sourceHandler.isDataServer()) |
| | | { |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | for (ReplicationServerHandler handler : connectedRSs.values()) |
| | | { |
| | | /** |
| | | * Ignore updates to RS with bad gen id |
| | |
| | | } |
| | | } |
| | | |
| | | /* |
| | | * Push the message to the LDAP servers |
| | | */ |
| | | for (DataServerHandler handler : directoryServers.values()) |
| | | // Push the message to the LDAP servers |
| | | for (DataServerHandler handler : connectedDSs.values()) |
| | | { |
| | | // Don't forward the change to the server that just sent it |
| | | if (handler == sourceHandler) |
| | |
| | | } |
| | | |
| | | // Look for DS eligible for assured |
| | | for (DataServerHandler handler : directoryServers.values()) |
| | | for (DataServerHandler handler : connectedDSs.values()) |
| | | { |
| | | // Don't forward the change to the server that just sent it |
| | | if (handler == sourceHandler) |
| | |
| | | private void collectRSsEligibleForAssuredReplication(byte groupId, |
| | | List<Integer> expectedServers) |
| | | { |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | for (ReplicationServerHandler handler : connectedRSs.values()) |
| | | { |
| | | if (handler.getGroupId() == groupId |
| | | // No ack expected from a RS with different group id |
| | |
| | | List<Integer> serversInTimeout = expectedAcksInfo.getTimeoutServers(); |
| | | for (Integer serverId : serversInTimeout) |
| | | { |
| | | ServerHandler expectedDSInTimeout = directoryServers.get(serverId); |
| | | ServerHandler expectedRSInTimeout = |
| | | replicationServers.get(serverId); |
| | | ServerHandler expectedDSInTimeout = connectedDSs.get(serverId); |
| | | ServerHandler expectedRSInTimeout = connectedRSs.get(serverId); |
| | | if (expectedDSInTimeout != null) |
| | | { |
| | | if (safeRead) |
| | |
| | | */ |
| | | public void stopReplicationServers(Collection<String> replServers) |
| | | { |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | for (ReplicationServerHandler handler : connectedRSs.values()) |
| | | { |
| | | if (replServers.contains(handler.getServerAddressURL())) |
| | | { |
| | |
| | | public void stopAllServers(boolean shutdown) |
| | | { |
| | | // Close session with other replication servers |
| | | for (ReplicationServerHandler serverHandler : replicationServers.values()) |
| | | for (ReplicationServerHandler serverHandler : connectedRSs.values()) |
| | | { |
| | | stopServer(serverHandler, shutdown); |
| | | } |
| | | |
| | | // Close session with other LDAP servers |
| | | for (DataServerHandler serverHandler : directoryServers.values()) |
| | | for (DataServerHandler serverHandler : connectedDSs.values()) |
| | | { |
| | | stopServer(serverHandler, shutdown); |
| | | } |
| | |
| | | */ |
| | | public boolean checkForDuplicateDS(DataServerHandler handler) |
| | | { |
| | | if (directoryServers.containsKey(handler.getServerId())) |
| | | if (connectedDSs.containsKey(handler.getServerId())) |
| | | { |
| | | // looks like two connected LDAP servers have the same serverId |
| | | Message message = ERR_DUPLICATE_SERVER_ID.get( |
| | | localReplicationServer.getMonitorInstanceName(), |
| | | directoryServers.get(handler.getServerId()).toString(), |
| | | connectedDSs.get(handler.getServerId()).toString(), |
| | | handler.toString(), handler.getServerId()); |
| | | logError(message); |
| | | return false; |
| | |
| | | try |
| | | { |
| | | // Stop useless monitoring publisher if no more RS or DS in domain |
| | | if ( (directoryServers.size() + replicationServers.size() )== 1) |
| | | if ( (connectedDSs.size() + connectedRSs.size() )== 1) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | |
| | | |
| | | if (handler.isReplicationServer()) |
| | | { |
| | | if (replicationServers.containsKey(handler.getServerId())) |
| | | if (connectedRSs.containsKey(handler.getServerId())) |
| | | { |
| | | unregisterServerHandler(handler); |
| | | handler.shutdown(); |
| | |
| | | buildAndSendTopoInfoToDSs(null); |
| | | } |
| | | } |
| | | } else if (directoryServers.containsKey(handler.getServerId())) |
| | | } else if (connectedDSs.containsKey(handler.getServerId())) |
| | | { |
| | | // If this is the last DS for the domain, |
| | | // shutdown the status analyzer |
| | | if (directoryServers.size() == 1) |
| | | if (connectedDSs.size() == 1) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | |
| | | { |
| | | if (handler.isReplicationServer()) |
| | | { |
| | | replicationServers.remove(handler.getServerId()); |
| | | connectedRSs.remove(handler.getServerId()); |
| | | } |
| | | else |
| | | { |
| | | directoryServers.remove(handler.getServerId()); |
| | | connectedDSs.remove(handler.getServerId()); |
| | | } |
| | | } |
| | | |
| | |
| | | // topology and the generationId has never been saved, then we can reset |
| | | // it and the next LDAP server to connect will become the new reference. |
| | | boolean lDAPServersConnectedInTheTopology = false; |
| | | if (directoryServers.isEmpty()) |
| | | if (connectedDSs.isEmpty()) |
| | | { |
| | | for (ReplicationServerHandler rsh : replicationServers.values()) |
| | | for (ReplicationServerHandler rsh : connectedRSs.values()) |
| | | { |
| | | if (generationId != rsh.getGenerationId()) |
| | | { |
| | |
| | | throws DirectoryException |
| | | { |
| | | ReplicationServerHandler oldHandler = |
| | | replicationServers.get(handler.getServerId()); |
| | | connectedRSs.get(handler.getServerId()); |
| | | if (oldHandler != null) |
| | | { |
| | | if (oldHandler.getServerAddressURL().equals( |
| | |
| | | public Set<String> getChangelogs() |
| | | { |
| | | Set<String> results = new LinkedHashSet<String>(); |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | for (ReplicationServerHandler handler : connectedRSs.values()) |
| | | { |
| | | results.add(handler.getServerAddressURL()); |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns as a set of String the list of LDAP servers connected to us. |
| | | * Each string is the serverID of a connected LDAP server. |
| | | * |
| | | * @return The set of connected LDAP servers |
| | | */ |
| | | public List<String> getConnectedLDAPservers() |
| | | { |
| | | List<String> results = new ArrayList<String>(0); |
| | | for (DataServerHandler handler : directoryServers.values()) |
| | | { |
| | | results.add(String.valueOf(handler.getServerId())); |
| | | } |
| | | return results; |
| | | } |
| | | |
| | | /** |
| | | * Creates and returns an iterator. |
| | | * When the iterator is not used anymore, the caller MUST call the |
| | | * ReplicationIterator.releaseCursor() method to free the resources |
| | |
| | | { |
| | | // Send to all replication servers with a least one remote |
| | | // server connected |
| | | for (ReplicationServerHandler rsh : replicationServers.values()) |
| | | for (ReplicationServerHandler rsh : connectedRSs.values()) |
| | | { |
| | | if (rsh.hasRemoteLDAPServers()) |
| | | { |
| | |
| | | } |
| | | |
| | | // Sends to all connected LDAP servers |
| | | for (DataServerHandler destinationHandler : directoryServers.values()) |
| | | for (DataServerHandler destinationHandler : connectedDSs.values()) |
| | | { |
| | | // Don't loop on the sender |
| | | if (destinationHandler == senderHandler) |
| | |
| | | { |
| | | // Destination is one server |
| | | DataServerHandler destinationHandler = |
| | | directoryServers.get(msg.getDestination()); |
| | | connectedDSs.get(msg.getDestination()); |
| | | if (destinationHandler != null) |
| | | { |
| | | servers.add(destinationHandler); |
| | |
| | | // have the targeted server connected. |
| | | if (senderHandler.isDataServer()) |
| | | { |
| | | for (ReplicationServerHandler h : replicationServers.values()) |
| | | for (ReplicationServerHandler rsHandler : connectedRSs.values()) |
| | | { |
| | | // Send to all replication servers with a least one remote |
| | | // server connected |
| | | if (h.isRemoteLDAPServer(msg.getDestination())) |
| | | if (rsHandler.isRemoteLDAPServer(msg.getDestination())) |
| | | { |
| | | servers.add(h); |
| | | servers.add(rsHandler); |
| | | } |
| | | } |
| | | } |
| | |
| | | // from the states stored in the serverHandler. |
| | | // - the server state |
| | | // - the older missing change |
| | | for (DataServerHandler lsh : this.directoryServers.values()) |
| | | for (DataServerHandler lsh : this.connectedDSs.values()) |
| | | { |
| | | monitorMsg.setServerState(lsh.getServerId(), |
| | | lsh.getServerState(), lsh.getApproxFirstMissingDate(), true); |
| | | } |
| | | |
| | | // Same for the connected RS |
| | | for (ReplicationServerHandler rsh : this.replicationServers.values()) |
| | | for (ReplicationServerHandler rsh : this.connectedRSs.values()) |
| | | { |
| | | monitorMsg.setServerState(rsh.getServerId(), |
| | | rsh.getServerState(), rsh.getApproxFirstMissingDate(), false); |
| | |
| | | */ |
| | | public void buildAndSendTopoInfoToDSs(ServerHandler notThisOne) |
| | | { |
| | | for (DataServerHandler handler : directoryServers.values()) |
| | | for (DataServerHandler handler : connectedDSs.values()) |
| | | { |
| | | if (notThisOne == null || handler != notThisOne) |
| | | // All except passed one |
| | |
| | | public void buildAndSendTopoInfoToRSs() |
| | | { |
| | | TopologyMsg topoMsg = createTopologyMsgForRS(); |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | for (ReplicationServerHandler handler : connectedRSs.values()) |
| | | { |
| | | for (int i=1; i<=2; i++) |
| | | { |
| | |
| | | List<DSInfo> dsInfos = new ArrayList<DSInfo>(); |
| | | |
| | | // Go through every DSs |
| | | for (DataServerHandler serverHandler : directoryServers.values()) |
| | | for (DataServerHandler serverHandler : connectedDSs.values()) |
| | | { |
| | | dsInfos.add(serverHandler.toDSInfo()); |
| | | } |
| | |
| | | { |
| | | // Go through every DSs (except recipient of msg) |
| | | List<DSInfo> dsInfos = new ArrayList<DSInfo>(); |
| | | for (DataServerHandler serverHandler : directoryServers.values()) |
| | | for (DataServerHandler serverHandler : connectedDSs.values()) |
| | | { |
| | | if (serverHandler.getServerId() == destDsId) |
| | | { |
| | |
| | | |
| | | // Go through every peer RSs (and get their connected DSs), also add info |
| | | // for RSs |
| | | for (ReplicationServerHandler serverHandler : replicationServers.values()) |
| | | for (ReplicationServerHandler serverHandler : connectedRSs.values()) |
| | | { |
| | | rsInfos.add(serverHandler.toRSInfo()); |
| | | |
| | |
| | | |
| | | // If we are the first replication server warned, |
| | | // then forwards the reset message to the remote replication servers |
| | | for (ServerHandler rsHandler : replicationServers.values()) |
| | | for (ServerHandler rsHandler : connectedRSs.values()) |
| | | { |
| | | try |
| | | { |
| | |
| | | |
| | | // Change status of the connected DSs according to the requested new |
| | | // reference generation id |
| | | for (DataServerHandler dsHandler : directoryServers.values()) |
| | | for (DataServerHandler dsHandler : connectedDSs.values()) |
| | | { |
| | | try |
| | | { |
| | |
| | | // TODO: i18n |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(), |
| | | e.getMessage() + " " + |
| | | stackTraceToSingleLineString(e))); |
| | | e.getMessage() + " " + stackTraceToSingleLineString(e))); |
| | | logError(mb.toMessage()); |
| | | } |
| | | } |
| | |
| | | + " given local generation Id=" + this.generationId); |
| | | } |
| | | |
| | | ServerHandler handler = replicationServers.get(serverId); |
| | | ServerHandler handler = connectedRSs.get(serverId); |
| | | if (handler == null) |
| | | { |
| | | handler = directoryServers.get(serverId); |
| | | handler = connectedDSs.get(serverId); |
| | | if (handler == null) |
| | | { |
| | | return false; |
| | |
| | | initializePendingMonitorData(); |
| | | |
| | | // Send the monitor requests to the connected replication servers. |
| | | for (ReplicationServerHandler rs : replicationServers.values()) |
| | | for (ReplicationServerHandler rs : connectedRSs.values()) |
| | | { |
| | | // Add server ID to pending table. |
| | | int serverId = rs.getServerId(); |
| | |
| | | // So for a given DS connected we can take the state and the max from |
| | | // the DS/state. |
| | | |
| | | for (ServerHandler ds : directoryServers.values()) |
| | | for (ServerHandler ds : connectedDSs.values()) |
| | | { |
| | | int serverID = ds.getServerId(); |
| | | |
| | | // the state comes from the state stored in the SH |
| | | ServerState dsState = ds.getServerState() |
| | | .duplicate(); |
| | | ServerState dsState = ds.getServerState().duplicate(); |
| | | |
| | | // the max CN sent by that LS also comes from the SH |
| | | ChangeNumber maxcn = dsState.getChangeNumber(serverID); |
| | |
| | | pendingMonitorData.setRSState(msg.getSenderID(), replServerState); |
| | | |
| | | // Store the remote LDAP servers states |
| | | Iterator<Integer> lsidIterator = msg.ldapIterator(); |
| | | while (lsidIterator.hasNext()) |
| | | Iterator<Integer> dsServerIdIterator = msg.ldapIterator(); |
| | | while (dsServerIdIterator.hasNext()) |
| | | { |
| | | int sid = lsidIterator.next(); |
| | | ServerState dsServerState = msg.getLDAPServerState(sid); |
| | | int dsServerId = dsServerIdIterator.next(); |
| | | ServerState dsServerState = msg.getLDAPServerState(dsServerId); |
| | | pendingMonitorData.setMaxCNs(dsServerState); |
| | | pendingMonitorData.setLDAPServerState(sid, dsServerState); |
| | | pendingMonitorData.setFirstMissingDate(sid, |
| | | msg.getLDAPApproxFirstMissingDate(sid)); |
| | | pendingMonitorData.setLDAPServerState(dsServerId, dsServerState); |
| | | pendingMonitorData.setFirstMissingDate(dsServerId, |
| | | msg.getLDAPApproxFirstMissingDate(dsServerId)); |
| | | } |
| | | |
| | | // Process the latency reported by the remote RSi on its connections |
| | | // to the other RSes |
| | | Iterator<Integer> rsidIterator = msg.rsIterator(); |
| | | while (rsidIterator.hasNext()) |
| | | Iterator<Integer> rsServerIdIterator = msg.rsIterator(); |
| | | while (rsServerIdIterator.hasNext()) |
| | | { |
| | | int rsid = rsidIterator.next(); |
| | | if (rsid == localReplicationServer.getServerId()) |
| | | int rsServerId = rsServerIdIterator.next(); |
| | | long newFmd = msg.getRSApproxFirstMissingDate(rsServerId); |
| | | if (rsServerId == localReplicationServer.getServerId()) |
| | | { |
| | | // this is the latency of the remote RSi regarding the current RS |
| | | // let's update the fmd of my connected LS |
| | | for (ServerHandler connectedlsh : directoryServers |
| | | .values()) |
| | | for (DataServerHandler connectedDS : connectedDSs.values()) |
| | | { |
| | | int connectedlsid = connectedlsh.getServerId(); |
| | | Long newfmd = msg.getRSApproxFirstMissingDate(rsid); |
| | | pendingMonitorData.setFirstMissingDate(connectedlsid, newfmd); |
| | | int connectedServerId = connectedDS.getServerId(); |
| | | pendingMonitorData.setFirstMissingDate(connectedServerId, newFmd); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // this is the latency of the remote RSi regarding another RSj |
| | | // let's update the latency of the LSes connected to RSj |
| | | ReplicationServerHandler rsjHdr = replicationServers.get(rsid); |
| | | ReplicationServerHandler rsjHdr = connectedRSs.get(rsServerId); |
| | | if (rsjHdr != null) |
| | | { |
| | | for (int remotelsid : rsjHdr.getConnectedDirectoryServerIds()) |
| | | for (int remoteServerId : rsjHdr.getConnectedDirectoryServerIds()) |
| | | { |
| | | Long newfmd = msg.getRSApproxFirstMissingDate(rsid); |
| | | pendingMonitorData.setFirstMissingDate(remotelsid, newfmd); |
| | | pendingMonitorData.setFirstMissingDate(remoteServerId, newFmd); |
| | | } |
| | | } |
| | | } |
| | |
| | | catch (RuntimeException e) |
| | | { |
| | | // FIXME: do we really expect these??? |
| | | logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e |
| | | .getMessage() + stackTraceToSingleLineString(e))); |
| | | logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get( |
| | | e.getMessage() + stackTraceToSingleLineString(e))); |
| | | } |
| | | finally |
| | | { |
| | |
| | | */ |
| | | public Map<Integer, DataServerHandler> getConnectedDSs() |
| | | { |
| | | return directoryServers; |
| | | return connectedDSs; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public Map<Integer, ReplicationServerHandler> getConnectedRSs() |
| | | { |
| | | return replicationServers; |
| | | return connectedRSs; |
| | | } |
| | | |
| | | |
| | |
| | | result.update(mostRecentDbCN); |
| | | } |
| | | } catch (Exception e) { |
| | | Message errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get( |
| | | " " + stackTraceToSingleLineString(e)); |
| | | logError(errMessage); |
| | | logError(ERR_WRITER_UNEXPECTED_EXCEPTION |
| | | .get(stackTraceToSingleLineString(e))); |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | |
| | | |
| | | private boolean isServerConnected(int serverId) |
| | | { |
| | | if (directoryServers.containsKey(serverId)) |
| | | if (connectedDSs.containsKey(serverId)) |
| | | { |
| | | return true; |
| | | } |
| | | |
| | | // not directly connected |
| | | for (ReplicationServerHandler rsHandler : replicationServers.values()) |
| | | for (ReplicationServerHandler rsHandler : connectedRSs.values()) |
| | | { |
| | | if (rsHandler.isRemoteLDAPServer(serverId)) |
| | | { |
| | |
| | | { |
| | | // If we are the first replication server warned, |
| | | // then forwards the message to the remote replication servers |
| | | for (ReplicationServerHandler rsHandler : replicationServers.values()) |
| | | for (ReplicationServerHandler rsHandler : connectedRSs.values()) |
| | | { |
| | | try |
| | | { |