| | |
| | | import java.util.Set; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | |
| | | |
| | | |
| | | /** |
| | | * When this Handler is connected to a remote replication server |
| | | * When this Handler is related to a remote replication server |
| | | * this collection will contain as many elements as there are |
| | | * LDAP servers connected to the remote replication server. |
| | | */ |
| | | private List<LightweightServerHandler> |
| | | remoteLDAPservers = new ArrayList<LightweightServerHandler>(); |
| | | private Map<Short, LightweightServerHandler> connectedServers = |
| | | new ConcurrentHashMap<Short, LightweightServerHandler>(); |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats from the replication |
| | |
| | | maxRcvWindow = windowSize; |
| | | rcvWindow = windowSize; |
| | | long localGenerationId = -1; |
| | | boolean handshakeOnly = false; |
| | | |
| | | try |
| | | { |
| | | if (baseDn != null) |
| | |
| | | maxSendQueue = receivedMsg.getMaxSendQueue(); |
| | | heartbeatInterval = receivedMsg.getHeartbeatInterval(); |
| | | |
| | | handshakeOnly = receivedMsg.isHandshakeOnly(); |
| | | |
| | | // The session initiator decides whether to use SSL. |
| | | sslEncryption = receivedMsg.getSSLEncryption(); |
| | | |
| | |
| | | replicationServerDomain = replicationServer. |
| | | getReplicationServerDomain(this.baseDn,true); |
| | | |
| | | boolean started; |
| | | if (serverIsLDAPserver) |
| | | if (!handshakeOnly) |
| | | { |
| | | started = replicationServerDomain.startServer(this); |
| | | } |
| | | else |
| | | { |
| | | started = replicationServerDomain.startReplicationServer(this); |
| | | } |
| | | |
| | | if (started) |
| | | { |
| | | // sendWindow MUST be created before starting the writer |
| | | sendWindow = new Semaphore(sendWindowSize); |
| | | |
| | | writer = new ServerWriter(session, serverId, |
| | | this, replicationServerDomain); |
| | | reader = new ServerReader(session, serverId, |
| | | this, replicationServerDomain); |
| | | |
| | | reader.start(); |
| | | writer.start(); |
| | | |
| | | // Create a thread to send heartbeat messages. |
| | | if (heartbeatInterval > 0) |
| | | boolean started; |
| | | if (serverIsLDAPserver) |
| | | { |
| | | heartbeatThread = new HeartbeatThread( |
| | | "replication Heartbeat to " + serverURL + |
| | | " for " + this.baseDn, |
| | | session, heartbeatInterval/3); |
| | | heartbeatThread.start(); |
| | | started = replicationServerDomain.startServer(this); |
| | | } |
| | | else |
| | | { |
| | | started = replicationServerDomain.startReplicationServer(this); |
| | | } |
| | | |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); |
| | | DirectoryServer.registerMonitorProvider(this); |
| | | } |
| | | else |
| | | { |
| | | // the connection is not valid, close it. |
| | | try |
| | | if (started) |
| | | { |
| | | if (debugEnabled()) |
| | | // sendWindow MUST be created before starting the writer |
| | | sendWindow = new Semaphore(sendWindowSize); |
| | | |
| | | writer = new ServerWriter(session, serverId, |
| | | this, replicationServerDomain); |
| | | reader = new ServerReader(session, serverId, |
| | | this, replicationServerDomain); |
| | | |
| | | reader.start(); |
| | | writer.start(); |
| | | |
| | | // Create a thread to send heartbeat messages. |
| | | if (heartbeatInterval > 0) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + " RS failed to start locally " + |
| | | " the connection from serverID="+serverId); |
| | | heartbeatThread = new HeartbeatThread( |
| | | "replication Heartbeat to " + serverURL + |
| | | " for " + this.baseDn, |
| | | session, heartbeatInterval/3); |
| | | heartbeatThread.start(); |
| | | } |
| | | session.close(); |
| | | } catch (IOException e1) |
| | | { |
| | | // ignore |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); |
| | | DirectoryServer.registerMonitorProvider(this); |
| | | } |
| | | else |
| | | { |
| | | // the connection is not valid, close it. |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + " RS failed to start locally " + |
| | | " the connection from serverID="+serverId); |
| | | } |
| | | session.close(); |
| | | } catch (IOException e1) |
| | | { |
| | | // ignore |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // For a hanshakeOnly connection, let's only create a reader |
| | | // in order to detect the connection closure. |
| | | reader = new ServerReader(session, serverId, |
| | | this, replicationServerDomain); |
| | | reader.start(); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | |
| | | /** |
| | | * Get the age of the older change that has not yet been replicated |
| | | * to the server handled by this ServerHandler. |
| | | * |
| | | * @return The age if the older change has not yet been replicated |
| | | * to the server handled by this ServerHandler. |
| | | */ |
| | | public Long getApproxFirstMissingDate() |
| | | { |
| | | // Get the older CN received |
| | | // From it, get the next sequence number |
| | | // Get the CN for the next sequence number |
| | | // If not present in the local RS db, |
| | | // then approximate with the older update time |
| | | ChangeNumber olderUpdateCN = getOlderUpdateCN(); |
| | | if (olderUpdateCN == null) |
| | | return null; |
| | | Long result = (long)0; |
| | | |
| | | return olderUpdateCN.getTime(); |
| | | // Get the older CN received |
| | | ChangeNumber olderUpdateCN = getOlderUpdateCN(); |
| | | if (olderUpdateCN != null) |
| | | { |
| | | // If not present in the local RS db, |
| | | // then approximate with the older update time |
| | | result=olderUpdateCN.getTime(); |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | /** |
| | | * Get the older Change Number for that server. |
| | | * Returns null when the queue is empty. |
| | | * @return The older change number. |
| | | */ |
| | | public ChangeNumber getOlderUpdateCN() |
| | | { |
| | | ChangeNumber result = null; |
| | | synchronized (msgQueue) |
| | | { |
| | | if (isFollowing()) |
| | | { |
| | | if (msgQueue.isEmpty()) |
| | | return null; |
| | | |
| | | UpdateMessage msg = msgQueue.first(); |
| | | return msg.getChangeNumber(); |
| | | { |
| | | result=null; |
| | | } |
| | | else |
| | | { |
| | | UpdateMessage msg = msgQueue.first(); |
| | | result = msg.getChangeNumber(); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if (lateQueue.isEmpty()) |
| | | return null; |
| | | { |
| | | // isFollowing is false AND lateQueue is empty |
| | | // We may be at the very moment when the writer has emptyed the |
| | | // lateQueue when it sent the last update. The writer will fill again |
| | | // the lateQueue when it will send the next update but we are not yet |
| | | // there. So let's take the last change not sent directly from |
| | | // the db. |
| | | |
| | | UpdateMessage msg = lateQueue.first(); |
| | | return msg.getChangeNumber(); |
| | | ReplicationIteratorComparator comparator = |
| | | new ReplicationIteratorComparator(); |
| | | SortedSet<ReplicationIterator> iteratorSortedSet = |
| | | new TreeSet<ReplicationIterator>(comparator); |
| | | try |
| | | { |
| | | // Build a list of candidates iterator (i.e. db i.e. server) |
| | | for (short serverId : replicationServerDomain.getServers()) |
| | | { |
| | | // get the last already sent CN from that server |
| | | ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId); |
| | | // get an iterator in this server db from that last change |
| | | ReplicationIterator iterator = |
| | | replicationServerDomain.getChangelogIterator(serverId, lastCsn); |
| | | // if that iterator has changes, then it is a candidate |
| | | // it is added in the sorted list at a position given by its |
| | | // current change (see ReplicationIteratorComparator). |
| | | if ((iterator != null) && (iterator.getChange() != null)) |
| | | { |
| | | iteratorSortedSet.add(iterator); |
| | | } |
| | | } |
| | | UpdateMessage msg = iteratorSortedSet.first().getChange(); |
| | | result = msg.getChangeNumber(); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | result=null; |
| | | } |
| | | finally |
| | | { |
| | | for (ReplicationIterator iterator : iteratorSortedSet) |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | UpdateMessage msg = lateQueue.first(); |
| | | result = msg.getChangeNumber(); |
| | | } |
| | | } |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | while (msgQueue.size() > maxQueueSize) |
| | | { |
| | | following = false; |
| | | setFollowing(false); |
| | | msgQueue.removeFirst(); |
| | | } |
| | | } |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | // The loop below relies on the fact that it is sorted based |
| | | // on the currentChange of each iterator to consider the next |
| | | // change accross all servers. |
| | | // Hence it is necessary to remove and eventual add again an iterator |
| | | // when looping in order to keep consistent the order of the |
| | | // iterators (see ReplicationIteratorComparator. |
| | | while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100)) |
| | | { |
| | | ReplicationIterator iterator = iteratorSortedSet.first(); |
| | |
| | | { |
| | | if (msgQueue.size() < maxQueueSize) |
| | | { |
| | | following = true; |
| | | setFollowing(true); |
| | | } |
| | | } |
| | | } |
| | |
| | | if (msgQueue.contains(msg)) |
| | | { |
| | | /* we finally catched up with the regular queue */ |
| | | following = true; |
| | | setFollowing(true); |
| | | lateQueue.clear(); |
| | | UpdateMessage msg1; |
| | | do |
| | |
| | | attributes.add(new Attribute("connected-to", this.replicationServerDomain. |
| | | getReplicationServer().getMonitorInstanceName())); |
| | | |
| | | // Add the oldest missing update |
| | | Long olderUpdateTime = this.getApproxFirstMissingDate(); |
| | | if (olderUpdateTime != null) |
| | | { |
| | | Date date = new Date(olderUpdateTime); |
| | | attributes.add(new Attribute("approx-older-change-not-synchronized", |
| | | date.toString())); |
| | | } |
| | | } |
| | | else |
| | | { |
| | |
| | | attributes.add(new Attribute("base-dn", |
| | | baseDn.toString())); |
| | | |
| | | // Update stats |
| | | |
| | | // Retrieves the topology counters |
| | | if (serverIsLDAPserver) |
| | | { |
| | | MonitorData md; |
| | | try |
| | | { |
| | | replicationServerDomain.retrievesRemoteMonitorData(); |
| | | md = replicationServerDomain.getMonitorData(); |
| | | |
| | | // Oldest missing update |
| | | Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId); |
| | | if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0)) |
| | | { |
| | | Date date = new Date(approxFirstMissingDate); |
| | | attributes.add(new Attribute("approx-older-change-not-synchronized", |
| | | date.toString())); |
| | | attributes.add( |
| | | new Attribute("approx-older-change-not-synchronized-millis", |
| | | String.valueOf(approxFirstMissingDate))); |
| | | } |
| | | |
| | | // Missing changes |
| | | long missingChanges = md.getMissingChanges(serverId); |
| | | attributes.add(new Attribute("missing-changes", |
| | | String.valueOf(missingChanges))); |
| | | |
| | | // Replication delay |
| | | long delay = md.getApproxDelay(serverId); |
| | | attributes.add(new Attribute("approximate-delay", |
| | | String.valueOf(delay))); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // FIXME: We failed retrieving the remote monitor data |
| | | // TODO: improve the log |
| | | // We failed retrieving the remote monitor data. |
| | | attributes.add(new Attribute("error", |
| | | stackTraceToSingleLineString(e))); |
| | | } |
| | | |
| | | // Compute the latency for the current SH |
| | | int missingChanges = |
| | | replicationServerDomain.getMissingChanges(serverState); |
| | | |
| | | // add the latency attribute to our monitor data |
| | | attributes.add(new Attribute("missing-changes", |
| | | String.valueOf(missingChanges))); |
| | | } |
| | | |
| | | // Deprecated |
| | |
| | | attributes.add(new Attribute("waiting-changes", |
| | | String.valueOf(getRcvMsgQueueSize()))); |
| | | // Age of oldest missing change |
| | | attributes.add(new Attribute("approximate-delay", |
| | | String.valueOf(getApproxDelay()))); |
| | | |
| | | // Date of the oldest missing change |
| | | long olderUpdateTime = getOlderUpdateTime(); |
| | |
| | | List<String> newRemoteLDAPservers = infoMsg.getConnectedServers(); |
| | | generationId = infoMsg.getGenerationId(); |
| | | |
| | | synchronized(remoteLDAPservers) |
| | | synchronized(connectedServers) |
| | | { |
| | | // Removes the existing structures |
| | | for (LightweightServerHandler lsh : remoteLDAPservers) |
| | | for (LightweightServerHandler lsh : connectedServers.values()) |
| | | { |
| | | lsh.stopHandler(); |
| | | } |
| | | remoteLDAPservers.clear(); |
| | | connectedServers.clear(); |
| | | |
| | | // Creates the new structure according to the message received. |
| | | for (String newConnectedServer : newRemoteLDAPservers) |
| | |
| | | LightweightServerHandler lsh |
| | | = new LightweightServerHandler(newConnectedServer, this); |
| | | lsh.startHandler(); |
| | | remoteLDAPservers.add(lsh); |
| | | connectedServers.put(lsh.getServerId(), lsh); |
| | | } |
| | | } |
| | | } |
| | |
| | | */ |
| | | public boolean isRemoteLDAPServer(short wantedServer) |
| | | { |
| | | for (LightweightServerHandler server : remoteLDAPservers) |
| | | synchronized(connectedServers) |
| | | { |
| | | if (wantedServer == server.getServerId()) |
| | | for (LightweightServerHandler server : connectedServers.values()) |
| | | { |
| | | return true; |
| | | if (wantedServer == server.getServerId()) |
| | | { |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public boolean hasRemoteLDAPServers() |
| | | { |
| | | return !remoteLDAPservers.isEmpty(); |
| | | return !connectedServers.isEmpty(); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | return this.replicationServerDomain; |
| | | } |
| | | |
| | | /** |
| | | * Return a Set containing the servers known by this replicationServer. |
| | | * @return a set containing the servers known by this replicationServer. |
| | | */ |
| | | public Set<Short> getConnectedServerIds() |
| | | { |
| | | return connectedServers.keySet(); |
| | | } |
| | | } |