| File was renamed from opendj-sdk/opends/src/server/org/opends/server/replication/server/ChangelogCache.java |
| | |
| | | /** |
| | | * This class define an in-memory cache that will be used to store |
| | | * the messages that have been received from an LDAP server or |
| | | * from another changelog server and that should be forwarded to |
| | | * from another replication server and that should be forwarded to |
| | | * other servers. |
| | | * |
| | | * The size of the cache is set by configuration. |
| | |
| | | * received to the disk and for trimming them |
| | | * Decision to trim can be based on disk space or age of the message |
| | | */ |
| | | public class ChangelogCache |
| | | public class ReplicationCache |
| | | { |
| | | private Object flowControlLock = new Object(); |
| | | private DN baseDn = null; |
| | |
| | | * must push to this particular server |
| | | * |
| | | * We add new TreeSet in the HashMap when a new server register |
| | | * to this changelog server. |
| | | * to this replication server. |
| | | * |
| | | */ |
| | | private Map<Short, ServerHandler> connectedServers = |
| | | new ConcurrentHashMap<Short, ServerHandler>(); |
| | | |
| | | /* |
| | | * This map contains one ServerHandler for each changelog servers |
| | | * with which we are connected (so normally all the changelogs) |
| | | * This map contains one ServerHandler for each replication servers |
| | | * with which we are connected (so normally all the replication servers) |
| | | * the first update in the balanced tree is the next change that we |
| | | * must push to this particular server |
| | | * |
| | | * We add new TreeSet in the HashMap when a new changelog server register |
| | | * to this changelog server. |
| | | * We add new TreeSet in the HashMap when a new replication server register |
| | | * to this replication server. |
| | | */ |
| | | private Map<Short, ServerHandler> changelogServers = |
| | | private Map<Short, ServerHandler> replicationServers = |
| | | new ConcurrentHashMap<Short, ServerHandler>(); |
| | | |
| | | /* |
| | |
| | | */ |
| | | private Map<Short, DbHandler> sourceDbHandlers = |
| | | new ConcurrentHashMap<Short, DbHandler>(); |
| | | private Changelog changelog; |
| | | private ReplicationServer replicationServer; |
| | | |
| | | /** |
| | | * Creates a new ChangelogCache associated to the DN baseDn. |
| | | * Creates a new ReplicationCache associated to the DN baseDn. |
| | | * |
| | | * @param baseDn The baseDn associated to the ChangelogCache. |
| | | * @param changelog the Changelog that created this changelog cache. |
| | | * @param baseDn The baseDn associated to the ReplicationCache. |
| | | * @param replicationServer the ReplicationServer that created this |
| | | * replicationServer cache. |
| | | */ |
| | | public ChangelogCache(DN baseDn, Changelog changelog) |
| | | public ReplicationCache(DN baseDn, ReplicationServer replicationServer) |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.changelog = changelog; |
| | | this.replicationServer = replicationServer; |
| | | } |
| | | |
| | | /** |
| | |
| | | /* |
| | | * TODO : In case that the source server is a LDAP server this method |
| | | * should check that change did get pushed to at least one |
| | | * other changelog server before pushing it to the LDAP servers |
| | | * other replication server before pushing it to the LDAP servers |
| | | */ |
| | | |
| | | sourceHandler.updateServerState(update); |
| | |
| | | int count = this.NumServers(); |
| | | if (count > 1) |
| | | { |
| | | if (sourceHandler.isChangelogServer()) |
| | | if (sourceHandler.isReplicationServer()) |
| | | ServerHandler.addWaitingAck(update, sourceHandler.getServerId(), |
| | | this, count - 1); |
| | | else |
| | |
| | | { |
| | | try |
| | | { |
| | | dbHandler = changelog.newDbHandler(id, baseDn); |
| | | dbHandler = replicationServer.newDbHandler(id, baseDn); |
| | | } catch (DatabaseException e) |
| | | { |
| | | /* |
| | | * Because of database problem we can't save any more changes |
| | | * from at least one LDAP server. |
| | | * This changelog therefore can't do it's job properly anymore |
| | | * This replicationServer therefore can't do it's job properly anymore |
| | | * and needs to close all its connections and shutdown itself. |
| | | */ |
| | | int msgID = MSGID_CHANGELOG_SHUTDOWN_DATABASE_ERROR; |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | changelog.shutdown(); |
| | | replicationServer.shutdown(); |
| | | return; |
| | | } |
| | | sourceDbHandlers.put(id, dbHandler); |
| | |
| | | |
| | | |
| | | /* |
| | | * Push the message to the changelog servers |
| | | * Push the message to the replication servers |
| | | */ |
| | | if (!sourceHandler.isChangelogServer()) |
| | | if (!sourceHandler.isReplicationServer()) |
| | | { |
| | | for (ServerHandler handler : changelogServers.values()) |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | | handler.add(update, sourceHandler); |
| | | } |
| | |
| | | |
| | | /** |
| | | * Create initialize context necessary for finding the changes |
| | | * that must be sent to a given LDAP or changelog server. |
| | | * that must be sent to a given LDAP or replication server. |
| | | * |
| | | * @param handler handler for the server that must be started |
| | | * @throws Exception when method has failed |
| | |
| | | { |
| | | handler.stopHandler(); |
| | | |
| | | if (handler.isChangelogServer()) |
| | | changelogServers.remove(handler.getServerId()); |
| | | if (handler.isReplicationServer()) |
| | | replicationServers.remove(handler.getServerId()); |
| | | else |
| | | connectedServers.remove(handler.getServerId()); |
| | | } |
| | | |
| | | /** |
| | | * Create initialize context necessary for finding the changes |
| | | * that must be sent to a given changelog server. |
| | | * that must be sent to a given replication server. |
| | | * |
| | | * @param handler the server ID to which we want to forward changes |
| | | * @throws Exception in case of errors |
| | | */ |
| | | public void startChangelog(ServerHandler handler) throws Exception |
| | | public void startReplicationServer(ServerHandler handler) throws Exception |
| | | { |
| | | /* |
| | | * create the balanced tree that will be used to forward changes |
| | | * TODO throw proper exception |
| | | */ |
| | | synchronized (changelogServers) |
| | | synchronized (replicationServers) |
| | | { |
| | | if (changelogServers.containsKey(handler.getServerId())) |
| | | if (replicationServers.containsKey(handler.getServerId())) |
| | | { |
| | | throw new Exception("changelog Id already registered"); |
| | | throw new Exception("Replication Server Id already registered"); |
| | | } |
| | | changelogServers.put(handler.getServerId(), handler); |
| | | replicationServers.put(handler.getServerId(), handler); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Return a Set of String containing the lists of Changelog servers |
| | | * Return a Set of String containing the lists of Replication servers |
| | | * connected to this server. |
| | | * @return the set of connected servers |
| | | */ |
| | |
| | | { |
| | | LinkedHashSet<String> mySet = new LinkedHashSet<String>(); |
| | | |
| | | for (ServerHandler handler : changelogServers.values()) |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | | mySet.add(handler.getServerAddressURL()); |
| | | } |
| | |
| | | |
| | | |
| | | /** |
| | | * Return a Set containing the servers known by this changelog. |
| | | * @return a set containing the servers known by this changelog. |
| | | * Return a Set containing the servers known by this replicationServer. |
| | | * @return a set containing the servers known by this replicationServer. |
| | | */ |
| | | public Set<Short> getServers() |
| | | { |
| | |
| | | * |
| | | * @param serverId Identifier of the server for which the iterator is created. |
| | | * @param changeNumber Starting point for the iterator. |
| | | * @return the created ChangelogIterator. |
| | | * @return the created ReplicationIterator. |
| | | */ |
| | | public ChangelogIterator getChangelogIterator(short serverId, |
| | | public ReplicationIterator getChangelogIterator(short serverId, |
| | | ChangeNumber changeNumber) |
| | | { |
| | | DbHandler handler = sourceDbHandlers.get(serverId); |
| | |
| | | } |
| | | |
| | | /** |
| | | * creates a new ChangelogDB with specified identifier. |
| | | * @param id the identifier of the new ChangelogDB. |
| | | * creates a new ReplicationDB with specified identifier. |
| | | * @param id the identifier of the new ReplicationDB. |
| | | * @param db the new db. |
| | | * |
| | | * @throws DatabaseException If a database error happened. |
| | |
| | | */ |
| | | private int NumServers() |
| | | { |
| | | return changelogServers.size() + connectedServers.size(); |
| | | return replicationServers.size() + connectedServers.size(); |
| | | } |
| | | |
| | | |
| | |
| | | * In this case, we can find the handler from the connectedServers map |
| | | * - the message that was acked comes from a server to which we are not |
| | | * connected. |
| | | * In this case we need to find the changelog server that forwarded |
| | | * In this case we need to find the replication server that forwarded |
| | | * the change and send back the ack to this server. |
| | | */ |
| | | ServerHandler handler = connectedServers.get( |
| | |
| | | } |
| | | else if (msg.getDestination() == RoutableMessage.ALL_SERVERS) |
| | | { |
| | | if (!senderHandler.isChangelogServer()) |
| | | if (!senderHandler.isReplicationServer()) |
| | | { |
| | | // Send to all changelogServers |
| | | for (ServerHandler destinationHandler : changelogServers.values()) |
| | | // Send to all replicationServers |
| | | for (ServerHandler destinationHandler : replicationServers.values()) |
| | | { |
| | | servers.add(destinationHandler); |
| | | } |
| | |
| | | if (senderHandler.isLDAPserver()) |
| | | { |
| | | // let's forward to the other changelogs |
| | | servers.addAll(changelogServers.values()); |
| | | servers.addAll(replicationServers.values()); |
| | | } |
| | | } |
| | | } |
| | |
| | | * |
| | | * @param changeNumber The ChangeNumber of the change that must be acked. |
| | | * @param isLDAPserver This boolean indicates if the server that sent the |
| | | * change was an LDAP server or a Changelog server. |
| | | * change was an LDAP server or a ReplicationServer. |
| | | */ |
| | | public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver) |
| | | { |
| | |
| | | * |
| | | * @param changeNumber The ChangeNumber of the change that must be acked. |
| | | * @param isLDAPserver This boolean indicates if the server that sent the |
| | | * change was an LDAP server or a Changelog server. |
| | | * change was an LDAP server or a ReplicationServer. |
| | | * @param serverId The identifier of the server from which we |
| | | * received the change.. |
| | | */ |
| | |
| | | if (isLDAPserver) |
| | | handler = connectedServers.get(serverId); |
| | | else |
| | | handler = changelogServers.get(serverId); |
| | | handler = replicationServers.get(serverId); |
| | | |
| | | // TODO : check for null handler and log error |
| | | try |
| | |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this ChangelogCache. |
| | | * Shutdown this ReplicationCache. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | // Close session with other changelogs |
| | | for (ServerHandler serverHandler : changelogServers.values()) |
| | | for (ServerHandler serverHandler : replicationServers.values()) |
| | | { |
| | | serverHandler.shutdown(); |
| | | } |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "ChangelogCache " + baseDn; |
| | | return "ReplicationCache " + baseDn; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void checkAllSaturation() throws IOException |
| | | { |
| | | for (ServerHandler handler : changelogServers.values()) |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | | handler.checkWindow(); |
| | | } |
| | |
| | | */ |
| | | public boolean restartAfterSaturation(ServerHandler sourceHandler) |
| | | { |
| | | for (ServerHandler handler : changelogServers.values()) |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | | if (!handler.restartAfterSaturation(sourceHandler)) |
| | | return false; |