| | |
| | | import java.io.File; |
| | | import java.io.FileFilter; |
| | | import java.io.FileInputStream; |
| | | import java.io.FileNotFoundException; |
| | | import java.io.FileOutputStream; |
| | | import java.io.IOException; |
| | | import java.io.InputStreamReader; |
| | | import java.io.OutputStreamWriter; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.io.Writer; |
| | | import java.util.HashMap; |
| | | import java.util.HashSet; |
| | |
| | | * <ul> |
| | | * <li>A "generation_[id].id" file, where [id] is the generation id</li> |
| | | * <li>One directory per server id, named after "[id].server" where [id] is the |
| | | * id of the server. Each directory contains the log files for the given server |
| | | * id.</li> |
| | | * id of the server.</li> |
| | | * </ul> |
| | | * All log files end with the ".log" suffix. Log files always include the "head.log" |
| | | * file and optionally zero to many read-only log files named after the lowest key |
| | | * and highest key present in the log file. |
| | | * Each server id directory contains the following files : |
| | | * <ul> |
| | | * <li>The "head.log" file, which is the more recent log file where records are appended.</li> |
| | | * <li>Zero to many read-only log files named after the lowest key |
| | | * and highest key present in the log file (they all end with the ".log" suffix.</li> |
| | | * <li>Optionally, a "offline.state" file that indicates that this particular server id |
| | | * of the domain is offline. This file contains the offline CSN, encoded as a String on a single line.</li> |
| | | * </ul> |
| | | * See {@code Log} class for details on the log files. |
| | | * |
| | | * <p> |
| | | * Layout example with two domains "o=test1" and "o=test2", each having server |
| | | * ids 22 and 33 : |
| | | * ids 22 and 33, with server id 33 for domain "o=test1" being offline : |
| | | * |
| | | * <pre> |
| | | * +---changelog |
| | |
| | | * | \---1.domain |
| | | * | \---generation1.id |
| | | * | \---22.server |
| | | * | \---head.log |
| | | * | \---head.log [contains last records written] |
| | | * | \---33.server |
| | | * | \---head.log |
| | | * | \---head.log [contains last records written] |
| | | * \---offline.state |
| | | * | \---2.domain |
| | | * | \---generation1.id |
| | | * | \---22.server |
| | | * | \---head.log |
| | | * | \---head.log [contains last records written] |
| | | * | \---33.server |
| | | * | \---head.log |
| | | * | \---head.log [contains last records written] |
| | | * </pre> |
| | | */ |
| | | class ReplicationEnvironment |
| | |
| | | |
| | | private static final String DOMAINS_STATE_FILENAME = "domains.state"; |
| | | |
| | | static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state"; |
| | | |
| | | private static final String DOMAIN_STATE_SEPARATOR = ":"; |
| | | |
| | | private static final String DOMAIN_SUFFIX = ".domain"; |
| | |
| | | |
| | | private static final String GENERATION_ID_FILE_SUFFIX = ".id"; |
| | | |
| | | private static final String UTF8_ENCODING = "UTF-8"; |
| | | |
| | | private static final FileFilter DOMAIN_FILE_FILTER = new FileFilter() |
| | | { |
| | | @Override |
| | |
| | | private final Map<DN, String> domains = new HashMap<DN, String>(); |
| | | |
| | | /** Exclusive lock to guard the domains mapping and change of state to a domain.*/ |
| | | // TODO : review the usefulness of this lock |
| | | private final Object domainLock = new Object(); |
| | | |
| | | /** The underlying replication server. */ |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Notify that the replica corresponding to provided domain and provided CSN |
| | | * is offline. |
| | | * |
| | | * @param domainDN |
| | | * the domain of the offline replica |
| | | * @param offlineCSN |
| | | * the offline replica serverId and offline timestamp |
| | | * @throws ChangelogException |
| | | * if a problem occurs |
| | | */ |
| | | void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException |
| | | { |
| | | synchronized (domainLock) |
| | | { |
| | | final String domainId = domains.get(domainDN); |
| | | final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId()); |
| | | if (!serverIdPath.exists()) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH.get( |
| | | domainDN.toString(), offlineCSN.getServerId(), serverIdPath.getPath())); |
| | | } |
| | | final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME); |
| | | Writer writer = null; |
| | | try |
| | | { |
| | | // Overwrite file, only the last sent offline CSN is kept |
| | | writer = newFileWriter(offlineFile); |
| | | writer.write(offlineCSN.toString()); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE.get( |
| | | domainDN.toString(), offlineCSN.getServerId(), offlineFile.getPath(), offlineCSN.toString()), e); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writer); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Notify that the replica corresponding to provided domain and server id |
| | | * is online. |
| | | * |
| | | * @param domainDN |
| | | * the domain of the replica |
| | | * @param serverId |
| | | * the replica serverId |
| | | * @throws ChangelogException |
| | | * if a problem occurs |
| | | */ |
| | | void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException |
| | | { |
| | | synchronized (domainLock) |
| | | { |
| | | final String domainId = domains.get(domainDN); |
| | | final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME); |
| | | if (offlineFile.exists()) |
| | | { |
| | | final boolean isDeleted = offlineFile.delete(); |
| | | if (!isDeleted) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE.get( |
| | | offlineFile.getPath(), domainDN.toString(), serverId)); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** Reads the domain state file to find mapping between each domainDN and its associated domainId. */ |
| | | private void readDomainsStateFile() throws ChangelogException |
| | | { |
| | |
| | | String line = null; |
| | | try |
| | | { |
| | | reader = new BufferedReader(new InputStreamReader(new FileInputStream(domainsStateFile), "UTF-8")); |
| | | reader = newFileReader(domainsStateFile); |
| | | while ((line = reader.readLine()) != null) |
| | | { |
| | | final int separatorPos = line.indexOf(DOMAIN_STATE_SEPARATOR); |
| | |
| | | * Update the changelog state with the state corresponding to the provided |
| | | * domain DN. |
| | | */ |
| | | private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState result) |
| | | private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState state) |
| | | throws ChangelogException |
| | | { |
| | | final File domainDirectory = getDomainPath(domainEntry.getValue()); |
| | |
| | | replicationRootPath, domainDirectory.getPath())); |
| | | } |
| | | final DN domainDN = domainEntry.getKey(); |
| | | result.setDomainGenerationId(domainDN, toGenerationId(generationId)); |
| | | state.setDomainGenerationId(domainDN, toGenerationId(generationId)); |
| | | |
| | | final File[] serverIds = domainDirectory.listFiles(SERVER_ID_FILE_FILTER); |
| | | if (serverIds == null) |
| | |
| | | } |
| | | for (final File serverId : serverIds) |
| | | { |
| | | result.addServerIdToDomain(toServerId(serverId.getName()), domainDN); |
| | | readStateForServerId(domainDN, serverId, state); |
| | | } |
| | | } |
| | | |
| | | private void readStateForServerId(DN domainDN, File serverIdPath, ChangelogState state) throws ChangelogException |
| | | { |
| | | state.addServerIdToDomain(toServerId(serverIdPath.getName()), domainDN); |
| | | |
| | | final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME); |
| | | if (offlineFile.exists()) |
| | | { |
| | | final CSN offlineCSN = readOfflineStateFile(offlineFile, domainDN); |
| | | state.addOfflineReplica(domainDN, offlineCSN); |
| | | } |
| | | } |
| | | |
| | | private CSN readOfflineStateFile(final File offlineFile, DN domainDN) throws ChangelogException |
| | | { |
| | | BufferedReader reader = null; |
| | | try |
| | | { |
| | | reader = newFileReader(offlineFile); |
| | | String line = reader.readLine(); |
| | | if (line == null || reader.readLine() != null) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE.get( |
| | | domainDN.toString(), offlineFile.getPath())); |
| | | } |
| | | return new CSN(line); |
| | | } |
| | | catch(IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE.get( |
| | | domainDN.toString(), offlineFile.getPath()), e); |
| | | } |
| | | finally { |
| | | StaticUtils.close(reader); |
| | | } |
| | | } |
| | | |
| | |
| | | Writer writer = null; |
| | | try |
| | | { |
| | | writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(domainsStateFile), "UTF-8")); |
| | | writer = newFileWriter(domainsStateFile); |
| | | for (final Entry<DN, String> entry : domains.entrySet()) |
| | | { |
| | | writer.write(String.format("%s%s%s%n", entry.getValue(), DOMAIN_STATE_SEPARATOR, entry.getKey())); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE.get(nextDomainId, |
| | | domainDN.toString(), domainsStateFile.getPath()), e); |
| | |
| | | return new File(replicationRootPath, domainId + DOMAIN_SUFFIX); |
| | | } |
| | | |
| | | private File getServerIdPath(final String domainId, final int serverId) |
| | | /** |
| | | * Return the path for the provided domain id and server id. |
| | | * Package private to be usable in tests. |
| | | * |
| | | * @param domainId |
| | | * The id corresponding to a domain DN |
| | | * @param serverId |
| | | * The server id to retrieve |
| | | * @return the path |
| | | */ |
| | | File getServerIdPath(final String domainId, final int serverId) |
| | | { |
| | | return new File(getDomainPath(domainId), String.valueOf(serverId) + SERVER_ID_SUFFIX); |
| | | } |
| | |
| | | throw new ChangelogException(ERR_CHANGELOG_GENERATION_ID_WRONG_FORMAT.get(data), e); |
| | | } |
| | | } |
| | | |
| | | /** Returns a buffered writer on the provided file. */ |
| | | private BufferedWriter newFileWriter(final File file) throws UnsupportedEncodingException, FileNotFoundException |
| | | { |
| | | return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), UTF8_ENCODING)); |
| | | } |
| | | |
| | | /** Returns a buffered reader on the provided file. */ |
| | | private BufferedReader newFileReader(final File file) throws UnsupportedEncodingException, FileNotFoundException |
| | | { |
| | | return new BufferedReader(new InputStreamReader(new FileInputStream(file), UTF8_ENCODING)); |
| | | } |
| | | } |