| File was renamed from opends/src/server/org/opends/server/replication/server/Changelog.java |
| | |
| | | import com.sleepycat.je.DatabaseException; |
| | | |
| | | /** |
| | | * Changelog Listener. |
| | | * ReplicationServer Listener. |
| | | * |
| | | * This singleton is the main object of the changelog server |
| | | * This singleton is the main object of the replication server |
| | | * It waits for the incoming connections and create listener |
| | | * and publisher objects for |
| | | * connection with LDAP servers and with changelog servers |
| | | * connection with LDAP servers and with replication servers |
| | | * |
| | | * It is responsible for creating the changelog cache and managing it |
| | | * It is responsible for creating the replication server cache and managing it |
| | | */ |
| | | public class Changelog |
| | | public class ReplicationServer |
| | | implements Runnable, ConfigurableComponent, |
| | | ConfigurationChangeListener<ChangelogServerCfg> |
| | | { |
| | |
| | | |
| | | private boolean runListen = true; |
| | | |
| | | /* The list of changelog servers configured by the administrator */ |
| | | private Collection<String> changelogServers; |
| | | /* The list of replication servers configured by the administrator */ |
| | | private Collection<String> replicationServers; |
| | | |
| | | /* This table is used to store the list of dn for which we are currently |
| | | * handling servers. |
| | | */ |
| | | private HashMap<DN, ChangelogCache> baseDNs = |
| | | new HashMap<DN, ChangelogCache>(); |
| | | private HashMap<DN, ReplicationCache> baseDNs = |
| | | new HashMap<DN, ReplicationCache>(); |
| | | |
| | | private String localURL = "null"; |
| | | private boolean shutdown = false; |
| | |
| | | private DN configDn; |
| | | private List<ConfigAttribute> configAttributes = |
| | | new ArrayList<ConfigAttribute>(); |
| | | private ChangelogDbEnv dbEnv; |
| | | private ReplicationDbEnv dbEnv; |
| | | private int rcvWindow; |
| | | private int queueSize; |
| | | private String dbDirname = null; |
| | |
| | | // de deleted from the persistent storage. |
| | | |
| | | /** |
| | | * Creates a new Changelog using the provided configuration entry. |
| | | * Creates a new Replication server using the provided configuration entry. |
| | | * |
| | | * @param configuration The configuration of this changelog. |
| | | * @param configuration The configuration of this replication server. |
| | | * @throws ConfigException When Configuration is invalid. |
| | | */ |
| | | public Changelog(ChangelogServerCfg configuration) throws ConfigException |
| | | public ReplicationServer(ChangelogServerCfg configuration) |
| | | throws ConfigException |
| | | { |
| | | shutdown = false; |
| | | runListen = true; |
| | | int changelogPort = configuration.getChangelogPort(); |
| | | changelogServerId = (short) configuration.getChangelogServerId(); |
| | | changelogServers = configuration.getChangelogServer(); |
| | | if (changelogServers == null) |
| | | changelogServers = new ArrayList<String>(); |
| | | replicationServers = configuration.getChangelogServer(); |
| | | if (replicationServers == null) |
| | | replicationServers = new ArrayList<String>(); |
| | | queueSize = configuration.getQueueSize(); |
| | | trimAge = configuration.getChangelogPurgeDelay(); |
| | | dbDirname = configuration.getChangelogDbDirectory(); |
| | |
| | | |
| | | /** |
| | | * The run method for the Listen thread. |
| | | * This thread accept incoming connections on the changelog server |
| | | * ports from other changelog servers or from LDAP servers |
| | | * This thread accept incoming connections on the replication server |
| | | * ports from other replication servers or from LDAP servers |
| | | * and spawn further thread responsible for handling those connections |
| | | */ |
| | | |
| | |
| | | Socket newSocket = null; |
| | | while (shutdown == false) |
| | | { |
| | | // Wait on the changelog port. |
| | | // Read incoming messages and create LDAP or Changelog listener and |
| | | // Publisher. |
| | | // Wait on the replicationServer port. |
| | | // Read incoming messages and create LDAP or ReplicationServer listener |
| | | // and Publisher. |
| | | |
| | | try |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * This method manages the connection with the other changelog servers. |
| | | * It periodically checks that this changelog server is indeed connected |
| | | * to all the other changelog servers and if not attempts to |
| | | * This method manages the connection with the other replication servers. |
| | | * It periodically checks that this replication server is indeed connected |
| | | * to all the other replication servers and if not attempts to |
| | | * make the connection. |
| | | */ |
| | | private void runConnect() |
| | |
| | | { |
| | | /* |
| | | * periodically check that we are connected to all other |
| | | * changelog servers and if not establish the connection |
| | | * replication servers and if not establish the connection |
| | | */ |
| | | for (ChangelogCache changelogCache: baseDNs.values()) |
| | | for (ReplicationCache replicationCache: baseDNs.values()) |
| | | { |
| | | Set<String> connectedChangelogs = changelogCache.getChangelogs(); |
| | | Set<String> connectedChangelogs = replicationCache.getChangelogs(); |
| | | /* |
| | | * check that all changelog in the config are in the connected Set |
| | | * if not create the connection |
| | | * check that all replication server in the config are in the connected |
| | | * Set. If not create the connection |
| | | */ |
| | | for (String serverURL : changelogServers) |
| | | for (String serverURL : replicationServers) |
| | | { |
| | | if ((serverURL.compareTo(this.serverURL) != 0) && |
| | | (!connectedChangelogs.contains(serverURL))) |
| | | { |
| | | this.connect(serverURL, changelogCache.getBaseDn()); |
| | | this.connect(serverURL, replicationCache.getBaseDn()); |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * initialization function for the changelog. |
| | | * initialization function for the replicationServer. |
| | | * |
| | | * @param changelogId The unique identifier for this changelog. |
| | | * @param changelogPort The port on which the changelog should listen. |
| | | * @param changelogId The unique identifier for this replicationServer. |
| | | * @param changelogPort The port on which the replicationServer should |
| | | * listen. |
| | | * |
| | | */ |
| | | private void initialize(short changelogId, int changelogPort) |
| | |
| | | try |
| | | { |
| | | /* |
| | | * Initialize the changelog database. |
| | | * Initialize the replicationServer database. |
| | | */ |
| | | dbEnv = new ChangelogDbEnv(getFileForPath(dbDirname).getAbsolutePath(), |
| | | dbEnv = new ReplicationDbEnv(getFileForPath(dbDirname).getAbsolutePath(), |
| | | this); |
| | | |
| | | /* |
| | | * create changelog cache |
| | | * create replicationServer cache |
| | | */ |
| | | serverId = changelogId; |
| | | |
| | | /* |
| | | * Open changelog socket |
| | | * Open replicationServer socket |
| | | */ |
| | | String localhostname = InetAddress.getLocalHost().getHostName(); |
| | | String localAdddress = InetAddress.getLocalHost().getHostAddress(); |
| | |
| | | /* |
| | | * create working threads |
| | | */ |
| | | myListenThread = new DirectoryThread(this, "Changelog Listener"); |
| | | myListenThread = new DirectoryThread(this, "Replication Server Listener"); |
| | | myListenThread.start(); |
| | | myConnectThread = new DirectoryThread(this, "Changelog Connect"); |
| | | myConnectThread = new DirectoryThread(this, "Replication Server Connect"); |
| | | myConnectThread.start(); |
| | | |
| | | } catch (DatabaseException e) |
| | |
| | | String message = getMessage(msgID, dbDirname); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | } catch (ChangelogDBException e) |
| | | } catch (ReplicationDBException e) |
| | | { |
| | | int msgID = MSGID_COULD_NOT_READ_DB; |
| | | String message = getMessage(msgID, dbDirname); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the ChangelogCache associated to the base DN given in parameter. |
| | | * Get the ReplicationCache associated to the base DN given in parameter. |
| | | * |
| | | * @param baseDn The base Dn for which the ChangelogCache must be returned. |
| | | * @return The ChangelogCache associated to the base DN given in parameter. |
| | | * @param baseDn The base Dn for which the ReplicationCache must be returned. |
| | | * @return The ReplicationCache associated to the base DN given in parameter. |
| | | */ |
| | | public ChangelogCache getChangelogCache(DN baseDn) |
| | | public ReplicationCache getReplicationCache(DN baseDn) |
| | | { |
| | | ChangelogCache changelogCache; |
| | | ReplicationCache replicationCache; |
| | | |
| | | synchronized (baseDNs) |
| | | { |
| | | changelogCache = baseDNs.get(baseDn); |
| | | if (changelogCache == null) |
| | | changelogCache = new ChangelogCache(baseDn, this); |
| | | baseDNs.put(baseDn, changelogCache); |
| | | replicationCache = baseDNs.get(baseDn); |
| | | if (replicationCache == null) |
| | | replicationCache = new ReplicationCache(baseDn, this); |
| | | baseDNs.put(baseDn, replicationCache); |
| | | } |
| | | |
| | | return changelogCache; |
| | | return replicationCache; |
| | | } |
| | | |
| | | /** |
| | | * Shutdown the Changelog service and all its connections. |
| | | * Shutdown the Replication Server service and all its connections. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | |
| | | listenSocket.close(); |
| | | } catch (IOException e) |
| | | { |
| | | // changelog service is closing anyway. |
| | | // replication Server service is closing anyway. |
| | | } |
| | | |
| | | // shutdown all the ChangelogCaches |
| | | for (ChangelogCache changelogCache : baseDNs.values()) |
| | | for (ReplicationCache replicationCache : baseDNs.values()) |
| | | { |
| | | changelogCache.shutdown(); |
| | | replicationCache.shutdown(); |
| | | } |
| | | |
| | | dbEnv.shutdown(); |
| | |
| | | |
| | | |
| | | /** |
| | | * Creates a new DB handler for this Changelog and the serverId and |
| | | * Creates a new DB handler for this ReplicationServer and the serverId and |
| | | * DN given in parameter. |
| | | * |
| | | * @param id The serverId for which the dbHandler must be created. |
| | | * @param baseDn The DN for which the dbHandler muste be created. |
| | | * @return The new DB handler for this Changelog and the serverId and |
| | | * @return The new DB handler for this ReplicationServer and the serverId and |
| | | * DN given in parameter. |
| | | * @throws DatabaseException in case of underlying database problem. |
| | | */ |