| | |
| | | |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.HashMap; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | |
| | | |
| | | /* Monitor data management */ |
| | | |
| | | // TODO: Remote monitor data cache lifetime is 500 ms/should be configurable |
| | | private long remoteMonitorDataLifeTime = 500; |
| | | // TODO: Remote monitor data cache lifetime is 500ms/should be configurable |
| | | private long monitorDataLifeTime = 500; |
| | | |
| | | /* Search op on monitor data is processed by a worker thread. |
| | | * Requests are sent to the other RS,and responses are received by the |
| | |
| | | */ |
| | | Semaphore remoteMonitorResponsesSemaphore; |
| | | |
| | | /* The date of the last time they have been elaborated */ |
| | | private long validityDate = 0; |
| | | |
| | | // For each LDAP server, its server state |
| | | private HashMap<Short, ServerState> LDAPStates = |
| | | new HashMap<Short, ServerState>(); |
| | | |
| | | // For each LDAP server, the last CN it published |
| | | private HashMap<Short, ChangeNumber> maxCNs = |
| | | new HashMap<Short, ChangeNumber>(); |
| | | |
| | | // For each LDAP server, an approximation of the date of the first missing |
| | | // change |
| | | private HashMap<Short, Long> approxFirstMissingDate = |
| | | new HashMap<Short, Long>(); |
| | | /** |
| | | * The monitor data consolidated over the topology. |
| | | */ |
| | | private MonitorData monitorData = new MonitorData(); |
| | | private MonitorData wrkMonitorData; |
| | | |
| | | /** |
| | | * Creates a new ReplicationServerDomain associated to the DN baseDn. |
| | |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.replicationServer = replicationServer; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " Created Cache for " + baseDn + " " + |
| | | stackTraceToSingleLineString(new Exception())); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Add an update that has been received to the list of |
| | |
| | | { |
| | | replicationServers.remove(handler.getServerId()); |
| | | handler.stopHandler(); |
| | | |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | sendReplServerInfo(); |
| | | } |
| | | } |
| | | else |
| | |
| | | { |
| | | connectedServers.remove(handler.getServerId()); |
| | | handler.stopHandler(); |
| | | |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | sendReplServerInfo(); |
| | | } |
| | | } |
| | | |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | sendReplServerInfo(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * |
| | | * @param serverId Identifier of the server for which the iterator is created. |
| | | * @param changeNumber Starting point for the iterator. |
| | | * @return the created ReplicationIterator. |
| | | * @return the created ReplicationIterator. Null when no DB is available |
| | | * for the provided server Id. |
| | | */ |
| | | public ReplicationIterator getChangelogIterator(short serverId, |
| | | ChangeNumber changeNumber) |
| | |
| | | { |
| | | return handler.generateIterator(changeNumber); |
| | | } |
| | | catch (Exception e) { |
| | | catch (Exception e) |
| | | { |
| | | return null; |
| | | } |
| | | } |
| | |
| | | */ |
| | | public void process(RoutableMessage msg, ServerHandler senderHandler) |
| | | { |
| | | |
| | | // Test the message for which a ReplicationServer is expected |
| | | // to be the destination |
| | | if (msg.getDestination() == this.replicationServer.getServerId()) |
| | |
| | | replServerMonitorRequestMsg.getDestination(), |
| | | replServerMonitorRequestMsg.getsenderID()); |
| | | |
| | | // Populate the RS state in the msg from the DbState |
| | | monitorMsg.setReplServerState(this.getDbServerState()); |
| | | |
| | | // Populate for each connected LDAP Server |
| | | // from the states stored in the serverHandler. |
| | | // - the server state |
| | | // - the older missing change |
| | | for (ServerHandler lsh : this.connectedServers.values()) |
| | | { |
| | | monitorMsg.setLDAPServerState( |
| | | monitorMsg.setServerState( |
| | | lsh.getServerId(), |
| | | lsh.getServerState(), |
| | | lsh.getApproxFirstMissingDate()); |
| | | lsh.getApproxFirstMissingDate(), |
| | | true); |
| | | } |
| | | |
| | | // Same for the connected RS |
| | | for (ServerHandler rsh : this.replicationServers.values()) |
| | | { |
| | | monitorMsg.setServerState( |
| | | rsh.getServerId(), |
| | | rsh.getServerState(), |
| | | rsh.getApproxFirstMissingDate(), |
| | | false); |
| | | } |
| | | |
| | | // Populate the RS state in the msg from the DbState |
| | | monitorMsg.setReplServerDbState(this.getDbServerState()); |
| | | |
| | | |
| | | try |
| | | { |
| | | senderHandler.send(monitorMsg); |
| | |
| | | } |
| | | } |
| | | |
| | | /* |
| | | /* ======================= |
| | | * Monitor Data generation |
| | | * ======================= |
| | | */ |
| | | |
| | | /** |
| | | * Retrieves the remote monitor data. |
| | | * |
| | | * Retrieves the global monitor data. |
| | | * @return The monitor data. |
| | | * @throws DirectoryException When an error occurs. |
| | | */ |
| | | protected void retrievesRemoteMonitorData() |
| | | synchronized protected MonitorData getMonitorData() |
| | | throws DirectoryException |
| | | { |
| | | if (validityDate > TimeThread.getTime()) |
| | | if (monitorData.getBuildDate() + monitorDataLifeTime |
| | | > TimeThread.getTime()) |
| | | { |
| | | // The current data are still valid. No need to renew them. |
| | | return; |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDn=" + baseDn + " getRemoteMonitorData in cache"); |
| | | // The current data are still valid. No need to renew them. |
| | | // FIXME |
| | | return null; |
| | | } |
| | | |
| | | // Clean |
| | | this.LDAPStates.clear(); |
| | | this.maxCNs.clear(); |
| | | |
| | | // Init the maxCNs of our direct LDAP servers from our own dbstate |
| | | for (ServerHandler rs : connectedServers.values()) |
| | | wrkMonitorData = new MonitorData(); |
| | | synchronized(wrkMonitorData) |
| | | { |
| | | short serverID = rs.getServerId(); |
| | | ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID); |
| | | if (cn == null) |
| | | { |
| | | // we have nothing in db for that server |
| | | cn = new ChangeNumber(0, 0 , serverID); |
| | | } |
| | | this.maxCNs.put(serverID, cn); |
| | | } |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDn=" + baseDn + " Computing monitor data "); |
| | | |
| | | ServerState replServerState = this.getDbServerState(); |
| | | Iterator<Short> it = replServerState.iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | short sid = it.next(); |
| | | ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid); |
| | | ChangeNumber maxCN = this.maxCNs.get(sid); |
| | | if ((maxCN != null) && (receivedCN.newer(maxCN))) |
| | | // Let's process our directly connected LSes |
| | | // - in the ServerHandler for a given LS1, the stored state contains : |
| | | // - the max CN produced by LS1 |
| | | // - the last CN consumed by LS1 from LS2..n |
| | | // - in the RSdomain/dbHandler, the built-in state contains : |
| | | // - the max CN produced by each server |
| | | // So for a given LS connected we can take the state and the max from |
| | | // the LS/state. |
| | | |
| | | for (ServerHandler directlsh : connectedServers.values()) |
| | | { |
| | | // We found a newer one |
| | | this.maxCNs.remove(sid); |
| | | this.maxCNs.put(sid, receivedCN); |
| | | short serverID = directlsh.getServerId(); |
| | | |
| | | // the state comes from the state stored in the SH |
| | | ServerState directlshState = directlsh.getServerState().duplicate(); |
| | | |
| | | // the max CN sent by that LS also comes from the SH |
| | | ChangeNumber maxcn = directlshState.getMaxChangeNumber(serverID); |
| | | if (maxcn == null) |
| | | { |
| | | // This directly connected LS has never produced any change |
| | | maxcn = new ChangeNumber(0, 0 , serverID); |
| | | } |
| | | wrkMonitorData.setMaxCN(serverID, maxcn); |
| | | wrkMonitorData.setLDAPServerState(serverID, directlshState); |
| | | wrkMonitorData.setFirstMissingDate(serverID, directlsh. |
| | | getApproxFirstMissingDate()); |
| | | } |
| | | |
| | | // Then initialize the max CN for the LS that produced something |
| | | // - from our own local db state |
| | | // - whatever they are directly or undirectly connected |
| | | ServerState dbServerState = getDbServerState(); |
| | | Iterator<Short> it = dbServerState.iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | short sid = it.next(); |
| | | ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid); |
| | | wrkMonitorData.setMaxCN(sid, storedCN); |
| | | } |
| | | |
| | | // Now we have used all available local informations |
| | | // and we need the remote ones. |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDn=" + baseDn + " Local monitor data: " + |
| | | wrkMonitorData.toString()); |
| | | } |
| | | |
| | | // Send Request to the other Replication Servers |
| | | if (remoteMonitorResponsesSemaphore == null) |
| | | { |
| | | remoteMonitorResponsesSemaphore = new Semaphore( |
| | | replicationServers.size() -1); |
| | | |
| | | sendMonitorDataRequest(); |
| | | |
| | | remoteMonitorResponsesSemaphore = new Semaphore(0); |
| | | short requestCnt = sendMonitorDataRequest(); |
| | | // Wait reponses from them or timeout |
| | | waitMonitorDataResponses(replicationServers.size()); |
| | | waitMonitorDataResponses(requestCnt); |
| | | } |
| | | else |
| | | { |
| | | // The processing of renewing the monitor cache is already running |
| | | // We'll make it sleeping until the end |
| | | // TODO: unit test for this case. |
| | | while (remoteMonitorResponsesSemaphore!=null) |
| | | { |
| | | waitMonitorDataResponses(1); |
| | | } |
| | | } |
| | | |
| | | // Now we have the expected answers of an error occured |
| | | validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime; |
| | | wrkMonitorData.completeComputing(); |
| | | |
| | | if (debugEnabled()) |
| | | // Store the new computed data as the reference |
| | | synchronized(monitorData) |
| | | { |
| | | debugMonitorData(); |
| | | } |
| | | } |
| | | |
| | | private void debugMonitorData() |
| | | { |
| | | String mds = " Monitor data="; |
| | | Iterator<Short> ite = LDAPStates.keySet().iterator(); |
| | | while (ite.hasNext()) |
| | | { |
| | | Short sid = ite.next(); |
| | | ServerState ss = LDAPStates.get(sid); |
| | | mds += " LDAPState(" + sid + ")=" + ss.toString(); |
| | | } |
| | | Iterator<Short> itc = maxCNs.keySet().iterator(); |
| | | while (itc.hasNext()) |
| | | { |
| | | Short sid = itc.next(); |
| | | ChangeNumber cn = maxCNs.get(sid); |
| | | mds += " maxCNs(" + sid + ")=" + cn.toString(); |
| | | } |
| | | |
| | | mds += "--"; |
| | | TRACER.debugInfo( |
| | | // Now we have the expected answers or an error occured |
| | | monitorData = wrkMonitorData; |
| | | wrkMonitorData = null; |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDN=" + baseDn + |
| | | mds); |
| | | " baseDn=" + baseDn + " *** Computed MonitorData: " + |
| | | monitorData.toString()); |
| | | } |
| | | return monitorData; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Sends a MonitorRequest message to all connected RS. |
| | | * @return the number of requests sent. |
| | | * @throws DirectoryException when a problem occurs. |
| | | */ |
| | | protected void sendMonitorDataRequest() |
| | | protected short sendMonitorDataRequest() |
| | | throws DirectoryException |
| | | { |
| | | short sent=0; |
| | | try |
| | | { |
| | | for (ServerHandler rs : replicationServers.values()) |
| | |
| | | MonitorRequestMessage(this.replicationServer.getServerId(), |
| | | rs.getServerId()); |
| | | rs.send(msg); |
| | | sent++; |
| | | } |
| | | } |
| | | catch(Exception e) |
| | |
| | | throw new DirectoryException(ResultCode.OTHER, |
| | | message, e); |
| | | } |
| | | return sent; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDn=" + baseDn + |
| | | " waiting for " + expectedResponses |
| | | + " expected monitor messages"); |
| | | |
| | | boolean allPermitsAcquired = |
| | | remoteMonitorResponsesSemaphore.tryAcquire( |
| | | expectedResponses, |
| | | (long) 500, TimeUnit.MILLISECONDS); |
| | | (long) 5000, TimeUnit.MILLISECONDS); |
| | | |
| | | if (!allPermitsAcquired) |
| | | { |
| | | logError(ERR_MISSING_REMOTE_MONITOR_DATA.get()); |
| | | // FIXME let's go on in best effort even with limited data received. |
| | | } |
| | | else |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | "Successfully received all " + replicationServers.size() |
| | | " baseDn=" + baseDn + |
| | | " Successfully received all " + expectedResponses |
| | | + " expected monitor messages"); |
| | | } |
| | | } |
| | |
| | | */ |
| | | public void receivesMonitorDataResponse(MonitorMessage msg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | "Receiving " + msg + " from " + msg.getsenderID() + |
| | | remoteMonitorResponsesSemaphore); |
| | | |
| | | if (remoteMonitorResponsesSemaphore == null) |
| | | { |
| | | // Ignoring the remote monitor data because an error occured previously |
| | | // FIXME |
| | | logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | "Receiving " + msg + " from " + msg.getsenderID() + |
| | | " remoteMonitorResponsesSemaphore should not be null")); |
| | | // Ignoring the remote monitor data because an error occured |
| | | // previously |
| | | return; |
| | | } |
| | | |
| | | try |
| | | { |
| | | // Here is the RS state : list <serverID, lastChangeNumber> |
| | | // For each LDAP Server, we keep the max CN accross the RSes |
| | | ServerState replServerState = msg.getReplServerState(); |
| | | Iterator<Short> it = replServerState.iterator(); |
| | | while (it.hasNext()) |
| | | synchronized(wrkMonitorData) |
| | | { |
| | | short sid = it.next(); |
| | | ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid); |
| | | ChangeNumber maxCN = this.maxCNs.get(sid); |
| | | if (receivedCN.newer(maxCN)) |
| | | { |
| | | // We found a newer one |
| | | this.maxCNs.remove(sid); |
| | | this.maxCNs.put(sid, receivedCN); |
| | | } |
| | | } |
| | | // Here is the RS state : list <serverID, lastChangeNumber> |
| | | // For each LDAP Server, we keep the max CN accross the RSes |
| | | ServerState replServerState = msg.getReplServerDbState(); |
| | | wrkMonitorData.setMaxCNs(replServerState); |
| | | |
| | | // Store the LDAP servers states |
| | | Iterator<Short> sidIterator = msg.iterator(); |
| | | while (sidIterator.hasNext()) |
| | | { |
| | | short sid = sidIterator.next(); |
| | | ServerState ss = msg.getLDAPServerState(sid); |
| | | this.LDAPStates.put(sid, ss); |
| | | this.approxFirstMissingDate.put(sid, |
| | | msg.getApproxFirstMissingDate(sid)); |
| | | // Store the remote LDAP servers states |
| | | Iterator<Short> lsidIterator = msg.ldapIterator(); |
| | | while (lsidIterator.hasNext()) |
| | | { |
| | | short sid = lsidIterator.next(); |
| | | wrkMonitorData.setLDAPServerState(sid, |
| | | msg.getLDAPServerState(sid).duplicate()); |
| | | wrkMonitorData.setFirstMissingDate(sid, |
| | | msg.getLDAPApproxFirstMissingDate(sid)); |
| | | } |
| | | |
| | | // Process the latency reported by the remote RSi on its connections |
| | | // to the other RSes |
| | | Iterator<Short> rsidIterator = msg.rsIterator(); |
| | | while (rsidIterator.hasNext()) |
| | | { |
| | | short rsid = rsidIterator.next(); |
| | | if (rsid == replicationServer.getServerId()) |
| | | { |
| | | // this is the latency of the remote RSi regarding the current RS |
| | | // let's update the fmd of my connected LS |
| | | for (ServerHandler connectedlsh : connectedServers.values()) |
| | | { |
| | | short connectedlsid = connectedlsh.getServerId(); |
| | | Long newfmd = msg.getRSApproxFirstMissingDate(rsid); |
| | | wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // this is the latency of the remote RSi regarding another RSj |
| | | // let's update the latency of the LSes connected to RSj |
| | | ServerHandler rsjHdr = replicationServers.get(rsid); |
| | | for(short remotelsid : rsjHdr.getConnectedServerIds()) |
| | | { |
| | | Long newfmd = msg.getRSApproxFirstMissingDate(rsid); |
| | | wrkMonitorData.setFirstMissingDate(remotelsid, newfmd); |
| | | } |
| | | } |
| | | } |
| | | if (debugEnabled()) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDn=" + baseDn + |
| | | " Processed msg from " + msg.getsenderID() + |
| | | " New monitor data: " + wrkMonitorData.toString()); |
| | | } |
| | | } |
| | | |
| | | // Decreases the number of expected responses and potentially |
| | | // wakes up the waiting requestor thread. |
| | | remoteMonitorResponsesSemaphore.release(); |
| | | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage() + |
| | | stackTraceToSingleLineString(e))); |
| | | |
| | | // If an exception occurs while processing one of the expected message, |
| | | // the processing is aborted and the waiting thread is awoke. |
| | | remoteMonitorResponsesSemaphore.notifyAll(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the state of the LDAP server with the provided serverId. |
| | | * @param serverId The server ID. |
| | | * @return The server state. |
| | | */ |
| | | public ServerState getServerState(short serverId) |
| | | { |
| | | return LDAPStates.get(serverId); |
| | | } |
| | | |
| | | /** |
| | | * Get the highest know change number of the LDAP server with the provided |
| | | * serverId. |
| | | * @param serverId The server ID. |
| | | * @return The highest change number. |
| | | */ |
| | | public ChangeNumber getMaxCN(short serverId) |
| | | { |
| | | return maxCNs.get(serverId); |
| | | } |
| | | |
| | | /** |
| | | * Get an approximation of the date of the oldest missing changes. |
| | | * serverId. |
| | | * @param serverId The server ID. |
| | | * @return The approximation of the date of the oldest missing change. |
| | | */ |
| | | public Long getApproxFirstMissingDate(short serverId) |
| | | { |
| | | return approxFirstMissingDate.get(serverId); |
| | | } |
| | | |
| | | /** |
| | | * Get the number of missing change for the server with the provided state. |
| | | * @param state The provided server state. |
| | | * @return The number of missing changes. |
| | | */ |
| | | public int getMissingChanges(ServerState state) |
| | | { |
| | | // Traverse the max Cn transmitted by each server |
| | | // For each server, get the highest CN know from the current server |
| | | // Sum the difference betwenn the max and the last |
| | | int missingChanges = 0; |
| | | Iterator<Short> itc = maxCNs.keySet().iterator(); |
| | | while (itc.hasNext()) |
| | | { |
| | | Short sid = itc.next(); |
| | | ChangeNumber maxCN = maxCNs.get(sid); |
| | | ChangeNumber last = state.getMaxChangeNumber(sid); |
| | | if (last == null) |
| | | { |
| | | last = new ChangeNumber(0,0, sid); |
| | | } |
| | | int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last); |
| | | missingChanges += missingChangesFromSID; |
| | | } |
| | | return missingChanges; |
| | | } |
| | | |
| | | /** |
| | | * Set the purge delay on all the db Handlers for this Domain |
| | | * of Replicaiton. |
| | | * |