| | |
| | | BackupTaskListener, RestoreTaskListener, ImportTaskListener, |
| | | ExportTaskListener |
| | | { |
| | | private int serverId; |
| | | private String serverURL; |
| | | |
| | | private ServerSocket listenSocket; |
| | | private Thread listenThread; |
| | | private Thread connectThread; |
| | | |
| | | /** The list of replication server URLs configured by the administrator. */ |
| | | private Collection<String> replicationServerUrls; |
| | | /** The current configuration of this replication server. */ |
| | | private ReplicationServerCfg config; |
| | | |
| | | /** |
| | | * This table is used to store the list of dn for which we are currently |
| | |
| | | private final Map<DN, ReplicationServerDomain> baseDNs = |
| | | new HashMap<DN, ReplicationServerDomain>(); |
| | | |
| | | private volatile boolean shutdown = false; |
| | | private int rcvWindow; |
| | | private int queueSize; |
| | | private final ChangelogDB changelogDB; |
| | | |
| | | /** |
| | | * The delay (in sec) after which the changes must be deleted from the |
| | | * persistent storage. |
| | | */ |
| | | private long purgeDelay; |
| | | |
| | | private int replicationPort; |
| | | private volatile boolean shutdown = false; |
| | | private boolean stopListen = false; |
| | | private ReplSessionSecurity replSessionSecurity; |
| | | |
| | |
| | | /** ID of the backend. */ |
| | | private static final String backendId = "replicationChanges"; |
| | | |
| | | /* |
| | | * Assured mode properties |
| | | */ |
| | | /** Timeout (in milliseconds) when waiting for acknowledgments. */ |
| | | private long assuredTimeout = 1000; |
| | | |
| | | /** Group id. */ |
| | | private byte groupId = 1; |
| | | |
| | | /** |
| | | * Number of pending changes for a DS, considered as threshold value to put |
| | | * the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled. |
| | | */ |
| | | private int degradedStatusThreshold = 5000; |
| | | |
| | | /** |
| | | * Number of milliseconds to wait before sending new monitoring messages. If |
| | | * value is 0, monitoring publisher is disabled. |
| | | */ |
| | | private long monitoringPublisherPeriod = 3000; |
| | | |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | |
| | | private long domainTicket = 0L; |
| | | |
| | | /** |
| | | * The weight affected to the replication server. |
| | | * Each replication server of the topology has a weight. When combined |
| | | * together, the weights of the replication servers of a same group can be |
| | | * translated to a percentage that determines the quantity of directory |
| | | * servers of the topology that should be connected to a replication server. |
| | | * For instance imagine a topology with 3 replication servers (with the same |
| | | * group id) with the following weights: RS1=1, RS2=1, RS3=2. This means that |
| | | * RS1 should have 25% of the directory servers connected in the topology, |
| | | * RS2 25%, and RS3 50%. This may be useful if the replication servers of the |
| | | * topology have a different power and one wants to spread the load between |
| | | * the replication servers according to their power. |
| | | */ |
| | | private int weight = 1; |
| | | |
| | | /** |
| | | * Holds the list of all replication servers instantiated in this VM. |
| | | * This allows to perform clean up of the RS databases in unit tests. |
| | | */ |
| | |
| | | public ReplicationServer(ReplicationServerCfg configuration) |
| | | throws ConfigException |
| | | { |
| | | replicationPort = configuration.getReplicationPort(); |
| | | serverId = configuration.getReplicationServerId(); |
| | | replicationServerUrls = configuration.getReplicationServer(); |
| | | if (replicationServerUrls == null) |
| | | replicationServerUrls = new ArrayList<String>(); |
| | | queueSize = configuration.getQueueSize(); |
| | | purgeDelay = configuration.getReplicationPurgeDelay(); |
| | | rcvWindow = configuration.getWindowSize(); |
| | | this.config = configuration; |
| | | |
| | | this.changelogDB = |
| | | new JEChangelogDB(this, configuration.getReplicationDBDirectory()); |
| | | |
| | | groupId = (byte)configuration.getGroupId(); |
| | | weight = configuration.getWeight(); |
| | | assuredTimeout = configuration.getAssuredTimeout(); |
| | | degradedStatusThreshold = configuration.getDegradedStatusThreshold(); |
| | | monitoringPublisherPeriod = configuration.getMonitoringPeriod(); |
| | | |
| | | replSessionSecurity = new ReplSessionSecurity(); |
| | | initialize(); |
| | | configuration.addChangeListener(this); |
| | |
| | | { |
| | | backendConfigEntryDN = |
| | | DN.decode("ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config"); |
| | | } catch (Exception e) { /* do nothing */ } |
| | | } catch (DirectoryException e) { /* do nothing */ } |
| | | |
| | | // Creates the backend associated to this ReplicationServer |
| | | // if it does not exist. |
| | |
| | | DirectoryServer.registerExportTaskListener(this); |
| | | DirectoryServer.registerImportTaskListener(this); |
| | | |
| | | localPorts.add(replicationPort); |
| | | localPorts.add(getReplicationPort()); |
| | | |
| | | // Keep track of this new instance |
| | | allInstances.add(this); |
| | | } |
| | | |
| | | private Set<HostPort> toHostPorts(Collection<String> serverAddresses) |
| | | private Set<HostPort> getConfiguredRSAddresses() |
| | | { |
| | | final Set<HostPort> results = new HashSet<HostPort>(); |
| | | for (String serverAddress : serverAddresses) |
| | | for (String serverAddress : this.config.getReplicationServer()) |
| | | { |
| | | results.add(HostPort.valueOf(serverAddress)); |
| | | } |
| | |
| | | |
| | | ReplicationMsg msg = session.receive(); |
| | | |
| | | final int queueSize = this.config.getQueueSize(); |
| | | final int rcvWindow = this.config.getWindowSize(); |
| | | if (msg instanceof ServerStartMsg) |
| | | { |
| | | DataServerHandler dsHandler = new DataServerHandler( |
| | |
| | | { |
| | | while (!shutdown) |
| | | { |
| | | final HostPort localAddress = HostPort.localAddress(replicationPort); |
| | | HostPort localAddress = HostPort.localAddress(getReplicationPort()); |
| | | for (ReplicationServerDomain domain : getReplicationServerDomains()) |
| | | { |
| | | /* |
| | |
| | | */ |
| | | final Set<HostPort> connectedRSAddresses = |
| | | getConnectedRSAddresses(domain); |
| | | for (HostPort rsAddress : toHostPorts(replicationServerUrls)) |
| | | for (HostPort rsAddress : getConfiguredRSAddresses()) |
| | | { |
| | | if (connectedRSAddresses.contains(rsAddress)) |
| | | { |
| | |
| | | session = replSessionSecurity.createClientSession(socket, timeoutMS); |
| | | |
| | | ReplicationServerHandler rsHandler = new ReplicationServerHandler( |
| | | session, queueSize, this, rcvWindow); |
| | | session, config.getQueueSize(), this, config.getWindowSize()); |
| | | rsHandler.connect(baseDN, sslEncryption); |
| | | } |
| | | catch (Exception e) |
| | |
| | | |
| | | setServerURL(); |
| | | listenSocket = new ServerSocket(); |
| | | listenSocket.bind(new InetSocketAddress(replicationPort)); |
| | | listenSocket.bind(new InetSocketAddress(getReplicationPort())); |
| | | |
| | | // creates working threads: we must first connect, then start to listen. |
| | | if (debugEnabled()) |
| | |
| | | logError(ERR_UNKNOWN_HOSTNAME.get()); |
| | | } catch (IOException e) |
| | | { |
| | | Message message = |
| | | ERR_COULD_NOT_BIND_CHANGELOG.get(replicationPort, e.getMessage()); |
| | | Message message = ERR_COULD_NOT_BIND_CHANGELOG.get( |
| | | getReplicationPort(), e.getMessage()); |
| | | logError(message); |
| | | } catch (DirectoryException e) |
| | | { |
| | |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | localPorts.remove(replicationPort); |
| | | localPorts.remove(getReplicationPort()); |
| | | |
| | | if (shutdown) |
| | | return; |
| | |
| | | */ |
| | | public long getTrimAge() |
| | | { |
| | | return purgeDelay * 1000; |
| | | return this.config.getReplicationPurgeDelay() * 1000; |
| | | } |
| | | |
| | | /** |
| | |
| | | // Some of those properties change don't need specific code. |
| | | // They will be applied for next connections. Some others have immediate |
| | | // effect |
| | | final Set<HostPort> oldRSAddresses = getConfiguredRSAddresses(); |
| | | |
| | | disconnectRemovedReplicationServers( |
| | | toHostPorts(configuration.getReplicationServer())); |
| | | final ReplicationServerCfg oldConfig = this.config; |
| | | this.config = configuration; |
| | | |
| | | replicationServerUrls = configuration.getReplicationServer(); |
| | | if (replicationServerUrls == null) |
| | | replicationServerUrls = new ArrayList<String>(); |
| | | disconnectRemovedReplicationServers(oldRSAddresses); |
| | | |
| | | queueSize = configuration.getQueueSize(); |
| | | long newPurgeDelay = configuration.getReplicationPurgeDelay(); |
| | | if (newPurgeDelay != purgeDelay) |
| | | final long newPurgeDelay = config.getReplicationPurgeDelay(); |
| | | if (newPurgeDelay != oldConfig.getReplicationPurgeDelay()) |
| | | { |
| | | purgeDelay = newPurgeDelay; |
| | | this.changelogDB.setPurgeDelay(purgeDelay * 1000); |
| | | this.changelogDB.setPurgeDelay(getTrimAge()); |
| | | } |
| | | |
| | | rcvWindow = configuration.getWindowSize(); |
| | | assuredTimeout = configuration.getAssuredTimeout(); |
| | | |
| | | // changing the listen port requires to stop the listen thread |
| | | // and restart it. |
| | | int newPort = configuration.getReplicationPort(); |
| | | if (newPort != replicationPort) |
| | | if (getReplicationPort() != oldConfig.getReplicationPort()) |
| | | { |
| | | stopListen = true; |
| | | try |
| | |
| | | listenThread.join(); |
| | | stopListen = false; |
| | | |
| | | replicationPort = newPort; |
| | | setServerURL(); |
| | | listenSocket = new ServerSocket(); |
| | | listenSocket.bind(new InetSocketAddress(replicationPort)); |
| | | listenSocket.bind(new InetSocketAddress(getReplicationPort())); |
| | | |
| | | listenThread = new ReplicationServerListenThread(this); |
| | | listenThread.start(); |
| | |
| | | } |
| | | } |
| | | |
| | | // Update threshold value for status analyzers (stop them if requested |
| | | // value is 0) |
| | | if (degradedStatusThreshold != configuration.getDegradedStatusThreshold()) |
| | | // Update threshold value for status analyzers |
| | | final int newThreshold = config.getDegradedStatusThreshold(); |
| | | if (oldConfig.getDegradedStatusThreshold() != newThreshold) |
| | | { |
| | | degradedStatusThreshold = configuration.getDegradedStatusThreshold(); |
| | | for (ReplicationServerDomain domain : getReplicationServerDomains()) |
| | | { |
| | | domain.updateDegradedStatusThreshold(degradedStatusThreshold); |
| | | domain.updateDegradedStatusThreshold(newThreshold); |
| | | } |
| | | } |
| | | |
| | | // Update period value for monitoring publishers (stop them if requested |
| | | // value is 0) |
| | | if (monitoringPublisherPeriod != configuration.getMonitoringPeriod()) |
| | | // Update period value for monitoring publishers |
| | | if (oldConfig.getMonitoringPeriod() != config.getMonitoringPeriod()) |
| | | { |
| | | monitoringPublisherPeriod = configuration.getMonitoringPeriod(); |
| | | for (ReplicationServerDomain domain : getReplicationServerDomains()) |
| | | { |
| | | domain.updateMonitoringPeriod(monitoringPublisherPeriod); |
| | | domain.updateMonitoringPeriod(config.getMonitoringPeriod()); |
| | | } |
| | | } |
| | | |
| | | // Changed the group id ? |
| | | byte newGroupId = (byte) configuration.getGroupId(); |
| | | if (newGroupId != groupId) |
| | | if (config.getGroupId() != oldConfig.getGroupId()) |
| | | { |
| | | groupId = newGroupId; |
| | | // Have a new group id: Disconnect every servers. |
| | | for (ReplicationServerDomain domain : getReplicationServerDomains()) |
| | | { |
| | |
| | | } |
| | | |
| | | // Set a potential new weight |
| | | if (weight != configuration.getWeight()) |
| | | if (oldConfig.getWeight() != config.getWeight()) |
| | | { |
| | | weight = configuration.getWeight(); |
| | | // Broadcast the new weight the the whole topology. This will make some |
| | | // DSs reconnect (if needed) to other RSs according to the new weight of |
| | | // this RS. |
| | | broadcastConfigChange(); |
| | | } |
| | | |
| | | final String newDir = configuration.getReplicationDBDirectory(); |
| | | if (newDir != null && !this.changelogDB.getDBDirectoryName().equals(newDir)) |
| | | final String newDir = config.getReplicationDBDirectory(); |
| | | if (newDir != null && !newDir.equals(oldConfig.getReplicationDBDirectory())) |
| | | { |
| | | return new ConfigChangeResult(ResultCode.SUCCESS, true); |
| | | } |
| | |
| | | * First try the set of configured replication servers to see if one of them |
| | | * is this replication server (this should always be the case). |
| | | */ |
| | | for (HostPort rsAddress : toHostPorts(replicationServerUrls)) |
| | | for (HostPort rsAddress : getConfiguredRSAddresses()) |
| | | { |
| | | /* |
| | | * No need validate the string format because the admin framework has |
| | | * already done it. |
| | | */ |
| | | if (rsAddress.getPort() == replicationPort && rsAddress.isLocalAddress()) |
| | | if (rsAddress.getPort() == getReplicationPort() |
| | | && rsAddress.isLocalAddress()) |
| | | { |
| | | serverURL = rsAddress.toString(); |
| | | return; |
| | |
| | | // Fall-back to the machine hostname. |
| | | final String host = InetAddress.getLocalHost().getHostName(); |
| | | // Ensure correct formatting of IPv6 addresses by using a HostPort instance. |
| | | serverURL = new HostPort(host, replicationPort).toString(); |
| | | serverURL = new HostPort(host, getReplicationPort()).toString(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public int getServerId() |
| | | { |
| | | return serverId; |
| | | return this.config.getReplicationServerId(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public int getQueueSize() |
| | | { |
| | | return queueSize; |
| | | return this.config.getQueueSize(); |
| | | } |
| | | |
| | | /** |
| | | * Creates the backend associated to this replication server. |
| | | * @throws ConfigException |
| | | */ |
| | | private void createBackend() |
| | | throws ConfigException |
| | | private void createBackend() throws ConfigException |
| | | { |
| | | try |
| | | { |
| | |
| | | |
| | | /** |
| | | * Get the assured mode timeout. |
| | | * <p> |
| | | * It is the Timeout (in milliseconds) when waiting for acknowledgments. |
| | | * |
| | | * @return The assured mode timeout. |
| | | */ |
| | | public long getAssuredTimeout() |
| | | { |
| | | return assuredTimeout; |
| | | return this.config.getAssuredTimeout(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public byte getGroupId() |
| | | { |
| | | return groupId; |
| | | return (byte) this.config.getGroupId(); |
| | | } |
| | | |
| | | /** |
| | | * Get the threshold value for status analyzer. |
| | | * @return The threshold value for status analyzer. |
| | | * Get the degraded status threshold value for status analyzer. |
| | | * <p> |
| | | * The degraded status threshold is the number of pending changes for a DS, |
| | | * considered as threshold value to put the DS in DEGRADED_STATUS. If value is |
| | | * 0, status analyzer is disabled. |
| | | * |
| | | * @return The degraded status threshold value for status analyzer. |
| | | */ |
| | | public int getDegradedStatusThreshold() |
| | | { |
| | | return degradedStatusThreshold; |
| | | return this.config.getDegradedStatusThreshold(); |
| | | } |
| | | |
| | | /** |
| | | * Get the monitoring publisher period value. |
| | | * <p> |
| | | * It is the number of milliseconds to wait before sending new monitoring |
| | | * messages. If value is 0, monitoring publisher is disabled. |
| | | * |
| | | * @return the monitoring publisher period value. |
| | | */ |
| | | public long getMonitoringPublisherPeriod() |
| | | { |
| | | return monitoringPublisherPeriod; |
| | | return this.config.getMonitoringPeriod(); |
| | | } |
| | | |
| | | /** |
| | | * Compute the list of replication servers that are not any more connected to |
| | | * this Replication Server and stop the corresponding handlers. |
| | | * |
| | | * @param newRSAddresses |
| | | * the list of addresses of the newly configured replication servers. |
| | | * @param oldRSAddresses |
| | | * the old list of configured replication servers addresses. |
| | | */ |
| | | private void disconnectRemovedReplicationServers(Set<HostPort> newRSAddresses) |
| | | private void disconnectRemovedReplicationServers(Set<HostPort> oldRSAddresses) |
| | | { |
| | | final Collection<HostPort> serversToDisconnect = new ArrayList<HostPort>(); |
| | | |
| | | for (HostPort rsAddress : toHostPorts(replicationServerUrls)) |
| | | final Set<HostPort> newRSAddresses = getConfiguredRSAddresses(); |
| | | for (HostPort oldRSAddress : oldRSAddresses) |
| | | { |
| | | if (!newRSAddresses.contains(rsAddress)) |
| | | if (!newRSAddresses.contains(oldRSAddress)) |
| | | { |
| | | serversToDisconnect.add(rsAddress); |
| | | serversToDisconnect.add(oldRSAddress); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public String getMonitorInstanceName() |
| | | { |
| | | return "Replication Server " + replicationPort + " " + serverId; |
| | | return "Replication Server " + getReplicationPort() + " " + getServerId(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public int getReplicationPort() |
| | | { |
| | | return replicationPort; |
| | | return config.getReplicationPort(); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Gets the weight. |
| | | * Gets the weight affected to the replication server. |
| | | * <p> |
| | | * Each replication server of the topology has a weight. When combined |
| | | * together, the weights of the replication servers of a same group can be |
| | | * translated to a percentage that determines the quantity of directory |
| | | * servers of the topology that should be connected to a replication server. |
| | | * <p> |
| | | * For instance imagine a topology with 3 replication servers (with the same |
| | | * group id) with the following weights: RS1=1, RS2=1, RS3=2. This means that |
| | | * RS1 should have 25% of the directory servers connected in the topology, RS2 |
| | | * 25%, and RS3 50%. This may be useful if the replication servers of the |
| | | * topology have a different power and one wants to spread the load between |
| | | * the replication servers according to their power. |
| | | * |
| | | * @return the weight |
| | | */ |
| | | public int getWeight() |
| | | { |
| | | return weight; |
| | | return this.config.getWeight(); |
| | | } |
| | | |
| | | |
| | | |
| | | private Collection<ReplicationServerDomain> getReplicationServerDomains() |
| | | { |
| | | synchronized (baseDNs) |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "RS(" + serverId + ") on " + serverURL + ", domains=" |
| | | return "RS(" + getServerId() + ") on " + serverURL + ", domains=" |
| | | + baseDNs.keySet(); |
| | | } |
| | | |