| | |
| | | allInstances.add(this); |
| | | } |
| | | |
| | | private Set<HostPort> toHostPorts(Collection<String> serverAddresses) |
| | | { |
| | | final Set<HostPort> results = new HashSet<HostPort>(); |
| | | for (String serverAddress : serverAddresses) |
| | | { |
| | | results.add(HostPort.valueOf(serverAddress)); |
| | | } |
| | | return results; |
| | | } |
| | | |
| | | /** |
| | | * Get the list of every replication servers instantiated in the current VM. |
| | | * @return The list of every replication servers instantiated in the current |
| | |
| | | { |
| | | while (!shutdown) |
| | | { |
| | | final String normalizedLocalURL = getNormalizedLocalURL(); |
| | | final HostPort localAddress = HostPort.localAddress(replicationPort); |
| | | for (ReplicationServerDomain domain : getReplicationServerDomains()) |
| | | { |
| | | /* |
| | |
| | | * cannot guarantee this since the configuration may not contain this |
| | | * RS. |
| | | */ |
| | | final Set<String> connectedRSUrls = getConnectedRSUrls(domain); |
| | | for (String rsURL : replicationServerUrls) |
| | | final Set<HostPort> connectedRSAddresses = |
| | | getConnectedRSAddresses(domain); |
| | | for (HostPort rsAddress : toHostPorts(replicationServerUrls)) |
| | | { |
| | | final String normalizedServerURL = normalizeServerURL(rsURL); |
| | | if (connectedRSUrls.contains(normalizedServerURL)) |
| | | if (connectedRSAddresses.contains(rsAddress)) |
| | | { |
| | | continue; // Skip: already connected. |
| | | } |
| | | |
| | | // FIXME: this will need changing if we ever support listening on |
| | | // specific addresses. |
| | | if (normalizedServerURL.equals(normalizedLocalURL)) { |
| | | if (rsAddress.equals(localAddress)) |
| | | { |
| | | continue; // Skip: avoid connecting to self. |
| | | } |
| | | |
| | | connect(rsURL, domain.getBaseDN()); |
| | | connect(rsAddress, domain.getBaseDN()); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | private Set<String> getConnectedRSUrls(ReplicationServerDomain domain) |
| | | private Set<HostPort> getConnectedRSAddresses(ReplicationServerDomain domain) |
| | | { |
| | | Set<String> results = new HashSet<String>(); |
| | | Set<HostPort> results = new HashSet<HostPort>(); |
| | | for (ReplicationServerHandler rsHandler : domain.getConnectedRSs().values()) |
| | | { |
| | | results.add(normalizeServerURL(rsHandler.getServerAddressURL())); |
| | | results.add(HostPort.valueOf(rsHandler.getServerAddressURL())); |
| | | } |
| | | return results; |
| | | } |
| | |
| | | /** |
| | | * Establish a connection to the server with the address and port. |
| | | * |
| | | * @param remoteServerURL |
| | | * The address and port for the server, separated by a colon. |
| | | * @param remoteServerAddress |
| | | * The address and port for the server |
| | | * @param baseDN |
| | | * The baseDN of the connection |
| | | */ |
| | | private void connect(String remoteServerURL, DN baseDN) |
| | | private void connect(HostPort remoteServerAddress, DN baseDN) |
| | | { |
| | | boolean sslEncryption =replSessionSecurity.isSslEncryption(remoteServerURL); |
| | | boolean sslEncryption = replSessionSecurity.isSslEncryption(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " + getMonitorInstanceName() + " connects to " |
| | | + remoteServerURL); |
| | | + remoteServerAddress); |
| | | |
| | | Socket socket = new Socket(); |
| | | Session session = null; |
| | | try |
| | | { |
| | | final HostPort hp = HostPort.valueOf(remoteServerURL); |
| | | final InetSocketAddress serverAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hp.getHost()), hp.getPort()); |
| | | socket.setTcpNoDelay(true); |
| | | int timeoutMS = MultimasterReplication.getConnectionTimeoutMS(); |
| | | socket.connect(serverAddr, timeoutMS); |
| | | socket.connect(remoteServerAddress.toInetSocketAddress(), timeoutMS); |
| | | session = replSessionSecurity.createClientSession(socket, timeoutMS); |
| | | |
| | | ReplicationServerHandler rsHandler = new ReplicationServerHandler( |
| | |
| | | // They will be applied for next connections. Some others have immediate |
| | | // effect |
| | | |
| | | disconnectRemovedReplicationServers(configuration.getReplicationServer()); |
| | | disconnectRemovedReplicationServers( |
| | | toHostPorts(configuration.getReplicationServer())); |
| | | |
| | | replicationServerUrls = configuration.getReplicationServer(); |
| | | if (replicationServerUrls == null) |
| | |
| | | |
| | | /** |
| | | * Try and set a sensible URL for this replication server. Since we are |
| | | * listening on all addresses there are a couple of potential candidates: 1) a |
| | | * matching server url in the replication server's configuration, 2) hostname |
| | | * local address. |
| | | * listening on all addresses there are a couple of potential candidates: |
| | | * <ol> |
| | | * <li>a matching server URL in the replication server's configuration,</li> |
| | | * <li>hostname local address.</li> |
| | | * </ol> |
| | | */ |
| | | private void setServerURL() throws UnknownHostException |
| | | { |
| | |
| | | * First try the set of configured replication servers to see if one of them |
| | | * is this replication server (this should always be the case). |
| | | */ |
| | | for (String rsUrl : replicationServerUrls) |
| | | for (HostPort rsAddress : toHostPorts(replicationServerUrls)) |
| | | { |
| | | /* |
| | | * No need validate the string format because the admin framework has |
| | | * already done it. |
| | | */ |
| | | final HostPort hp = HostPort.valueOf(rsUrl); |
| | | if (hp.getPort() == replicationPort && hp.isLocalAddress()) |
| | | if (rsAddress.getPort() == replicationPort && rsAddress.isLocalAddress()) |
| | | { |
| | | serverURL = rsUrl; |
| | | serverURL = rsAddress.toString(); |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // Fall-back to the machine hostname. |
| | | serverURL = InetAddress.getLocalHost().getHostName() + ":" |
| | | + replicationPort; |
| | | final String host = InetAddress.getLocalHost().getHostName(); |
| | | // Ensure correct formatting of IPv6 addresses by using a HostPort instance. |
| | | serverURL = new HostPort(host, replicationPort).toString(); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Compute the list of replication servers that are not any |
| | | * more connected to this Replication Server and stop the |
| | | * corresponding handlers. |
| | | * @param newReplServers the list of the new replication servers configured. |
| | | * Compute the list of replication servers that are not any more connected to |
| | | * this Replication Server and stop the corresponding handlers. |
| | | * |
| | | * @param newRSAddresses |
| | | * the list of addresses of the newly configured replication servers. |
| | | */ |
| | | private void disconnectRemovedReplicationServers( |
| | | Collection<String> newReplServers) |
| | | private void disconnectRemovedReplicationServers(Set<HostPort> newRSAddresses) |
| | | { |
| | | Collection<String> serversToDisconnect = new ArrayList<String>(); |
| | | final Collection<HostPort> serversToDisconnect = new ArrayList<HostPort>(); |
| | | |
| | | for (String rsUrl : replicationServerUrls) |
| | | for (HostPort rsAddress : toHostPorts(replicationServerUrls)) |
| | | { |
| | | if (!newReplServers.contains(rsUrl)) |
| | | if (!newRSAddresses.contains(rsAddress)) |
| | | { |
| | | try |
| | | { |
| | | // translate the server name into IP address and keep the port number |
| | | final HostPort hp = HostPort.valueOf(rsUrl); |
| | | final String hostAddress = |
| | | InetAddress.getByName(hp.getHost()).getHostAddress(); |
| | | serversToDisconnect.add(hostAddress + ":" + hp.getPort()); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | logError(ERR_COULD_NOT_SOLVE_HOSTNAME.get(rsUrl)); |
| | | } |
| | | serversToDisconnect.add(rsAddress); |
| | | } |
| | | } |
| | | |
| | |
| | | return this.changelogDB.getDBDirectoryName(); |
| | | } |
| | | |
| | | /** |
| | | * Normalize a URL so that this host's local address is used if the provided |
| | | * host name corresponds to a local interface. This method is design to work |
| | | * with getNormalizedLocalURL(). |
| | | */ |
| | | private String normalizeServerURL(final String url) |
| | | { |
| | | final HostPort hp = HostPort.valueOf(url); |
| | | try |
| | | { |
| | | InetAddress inetAddress = InetAddress.getByName(hp.getHost()); |
| | | if (HostPort.isLocalAddress(inetAddress)) |
| | | { |
| | | inetAddress = getLocalAddress(); |
| | | } |
| | | return inetAddress.getHostAddress() + ":" + hp.getPort(); |
| | | } |
| | | catch (UnknownHostException e) |
| | | { |
| | | // This should not happen, but if it does then just default to the |
| | | // original URL. |
| | | Message message = ERR_COULD_NOT_SOLVE_HOSTNAME.get(hp.getHost()); |
| | | logError(message); |
| | | return url; |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | private InetAddress getLocalAddress() |
| | | { |
| | | try |
| | | { |
| | | return InetAddress.getLocalHost(); |
| | | } |
| | | catch (UnknownHostException e) |
| | | { |
| | | try |
| | | { |
| | | return InetAddress.getByAddress("localhost", new byte[] { 0x7f, 0x00, |
| | | 0x00, 0x01 }); |
| | | } |
| | | catch (UnknownHostException never) |
| | | { |
| | | // Illegal address length. |
| | | throw new RuntimeException(never); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Return normalized local url suitable for comparison against result returned |
| | | * by normalizeServerURL(). |
| | | */ |
| | | private String getNormalizedLocalURL() |
| | | { |
| | | return getLocalAddress().getHostAddress() + ":" + replicationPort; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |