OPENDJ-1467 : File Based Changelog must support replicas temporarily leaving the topology
* File-based changelog
Store the offline CSN when a replica goes offline
Read the offline CSN if present to build the changelog state at startup
Update ReplicationEnvironment.java to manage read and write from/to storage
Update ReplicationEnvironmentTest.java with more unit tests
Update replication.properties with new messages
* File and JE based changelog
Remove the offline CSN if present when receiving an heartbeat or an
update message
Update FileChangelogDB.java, JEChangelogDB.java, ReplicationDBEnv.java to
manage online replica notification
* Other minor updates : renaming, comments
| | |
| | | to file system for log file '%s' |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_SEEK_260=Could not seek to position %d for reader \ |
| | | on log file '%s' |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root directory '%s' for \ |
| | | log file |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root \ |
| | | directory '%s' for log file |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_DECODE_DN_FROM_DOMAIN_STATE_FILE_262=Could not decode DN \ |
| | | from domain state file '%s', from line '%s' |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_READ_DOMAIN_STATE_FILE_263=Could not read domain state \ |
| | |
| | | head log file from '%s' to '%s' |
| | | INFO_CHANGELOG_LOG_FILE_ROTATION_276=Rotation needed for log file '%s', \ |
| | | size of head log file is %d bytes |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH_277=Could not add replica \ |
| | | offline for domain %s and server id %d because the path '%s' does not exist |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE_278=Could not write offline \ |
| | | replica information for domain %s and server id %d, using path '%s' (offline CSN is %s) |
| | | SEVERE_ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE_279=Could not read replica offline \ |
| | | state file '%s' for domain %s, it should contain exactly one line corresponding to the offline CSN |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE_280=Could not read content of \ |
| | | replica offline state file '%s' for domain %s |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE_281=Could not delete replica \ |
| | | offline state file '%s' for domain %s and server id %d |
| | |
| | | { |
| | | if (msg.isReplicaOfflineMsg()) |
| | | { |
| | | domainDB.replicaOffline(baseDN, msg.getCSN()); |
| | | domainDB.notifyReplicaOffline(baseDN, msg.getCSN()); |
| | | } |
| | | else |
| | | { |
| | |
| | | * @param heartbeatCSN |
| | | * The CSN heartbeat sent by this replica (contains the serverId and |
| | | * timestamp of the heartbeat) |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | void replicaHeartbeat(DN baseDN, CSN heartbeatCSN); |
| | | void replicaHeartbeat(DN baseDN, CSN heartbeatCSN) throws ChangelogException; |
| | | |
| | | /** |
| | | * Let the DB know this replica is going down. |
| | |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | void replicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException; |
| | | void notifyReplicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException; |
| | | } |
| | |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId()); |
| | | indexer.publishUpdateMsg(baseDN, updateMsg); |
| | | } |
| | | return wasCreated; |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) |
| | | public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException |
| | | { |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId()); |
| | | indexer.publishHeartbeat(baseDN, heartbeatCSN); |
| | | } |
| | | } |
| | | |
| | | private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId) |
| | | throws ChangelogException |
| | | { |
| | | if (indexer.isReplicaOffline(baseDN, serverId)) |
| | | { |
| | | replicationEnv.notifyReplicaOnline(baseDN, serverId); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void replicaOffline(final DN baseDN, final CSN offlineCSN) |
| | | public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException |
| | | { |
| | | replicationEnv.notifyReplicaOffline(baseDN, offlineCSN); |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | indexer.replicaOffline(baseDN, offlineCSN); |
| | | } |
| | | // TODO save this state in the changelogStateDB? |
| | | } |
| | | |
| | | /** |
| | |
| | | /** |
| | | * The last key appended to the log. In order to keep the ordering of the keys |
| | | * in the log, any attempt to append a record with a key lower or equal to |
| | | * this key will silently fail. |
| | | * this is rejected (no error but an event is logged). |
| | | */ |
| | | private K lastAppendedKey; |
| | | |
| | |
| | | /** |
| | | * Add the provided record at the end of this log. |
| | | * <p> |
| | | * The record must have a key strictly higher than the key |
| | | * of the last record added. If it is not the case, the record is not |
| | | * appended and the method returns immediately. |
| | | * <p> |
| | | * In order to ensure that record is written out of buffers and persisted |
| | | * to file system, it is necessary to explicitely call the |
| | | * {@code syncToFileSystem()} method. |
| | |
| | | * @param record |
| | | * The record to add. |
| | | * @throws ChangelogException |
| | | * If the record can't be added to the log. |
| | | * If an error occurs while adding the record to the log. |
| | | */ |
| | | public void append(final Record<K, V> record) throws ChangelogException |
| | | { |
| | |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(HEAD_LOG_FILE_NAME, rotatedLogFile.getPath()), e); |
| | | ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(headLogFile.getPath(), rotatedLogFile.getPath()), e); |
| | | } |
| | | } |
| | | |
| | |
| | | private void openHeadLogFile() throws ChangelogException |
| | | { |
| | | final LogFile<K, V> head = LogFile.newAppendableLogFile(new File(logPath, HEAD_LOG_FILE_NAME), recordParser); |
| | | Record<K,V> newestRecord = head.getNewestRecord(); |
| | | lastAppendedKey = newestRecord == null ? null : newestRecord.getKey(); |
| | | final Record<K,V> newestRecord = head.getNewestRecord(); |
| | | lastAppendedKey = newestRecord != null ? newestRecord.getKey() : null; |
| | | logFiles.put(recordParser.getMaxKey(), head); |
| | | } |
| | | |
| | |
| | | if (key != null) |
| | | { |
| | | boolean isFound = currentCursor.positionTo(key, findNearest); |
| | | if (isFound && getRecord() == null) |
| | | if (isFound && getRecord() == null && !log.isHeadLogFile(currentLogFile)) |
| | | { |
| | | // The key to position to may be in the next file, force the switch |
| | | // The key to position is probably in the next file, force the switch |
| | | isFound = next(); |
| | | } |
| | | return isFound; |
| | |
| | | currentLogFile = logFile; |
| | | currentCursor = currentLogFile.getCursor(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | public String toString() |
| | | { |
| | | return String.format("Cursor on log : %s, current log file: %s, current cursor: %s", |
| | | log.logPath, currentLogFile.getFile().getName(), currentCursor); |
| | | } |
| | | } |
| | | |
| | | /** An empty cursor, that always return null records and false to {@code next()} method. */ |
| | |
| | | import java.io.File; |
| | | import java.io.FileFilter; |
| | | import java.io.FileInputStream; |
| | | import java.io.FileNotFoundException; |
| | | import java.io.FileOutputStream; |
| | | import java.io.IOException; |
| | | import java.io.InputStreamReader; |
| | | import java.io.OutputStreamWriter; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.io.Writer; |
| | | import java.util.HashMap; |
| | | import java.util.HashSet; |
| | |
| | | * <ul> |
| | | * <li>A "generation_[id].id" file, where [id] is the generation id</li> |
| | | * <li>One directory per server id, named after "[id].server" where [id] is the |
| | | * id of the server. Each directory contains the log files for the given server |
| | | * id.</li> |
| | | * id of the server.</li> |
| | | * </ul> |
| | | * All log files end with the ".log" suffix. Log files always include the "head.log" |
| | | * file and optionally zero to many read-only log files named after the lowest key |
| | | * and highest key present in the log file. |
| | | * Each server id directory contains the following files : |
| | | * <ul> |
| | | * <li>The "head.log" file, which is the more recent log file where records are appended.</li> |
| | | * <li>Zero to many read-only log files named after the lowest key |
| | | * and highest key present in the log file (they all end with the ".log" suffix.</li> |
| | | * <li>Optionally, a "offline.state" file that indicates that this particular server id |
| | | * of the domain is offline. This file contains the offline CSN, encoded as a String on a single line.</li> |
| | | * </ul> |
| | | * See {@code Log} class for details on the log files. |
| | | * |
| | | * <p> |
| | | * Layout example with two domains "o=test1" and "o=test2", each having server |
| | | * ids 22 and 33 : |
| | | * ids 22 and 33, with server id 33 for domain "o=test1" being offline : |
| | | * |
| | | * <pre> |
| | | * +---changelog |
| | |
| | | * | \---1.domain |
| | | * | \---generation1.id |
| | | * | \---22.server |
| | | * | \---head.log |
| | | * | \---head.log [contains last records written] |
| | | * | \---33.server |
| | | * | \---head.log |
| | | * | \---head.log [contains last records written] |
| | | * \---offline.state |
| | | * | \---2.domain |
| | | * | \---generation1.id |
| | | * | \---22.server |
| | | * | \---head.log |
| | | * | \---head.log [contains last records written] |
| | | * | \---33.server |
| | | * | \---head.log |
| | | * | \---head.log [contains last records written] |
| | | * </pre> |
| | | */ |
| | | class ReplicationEnvironment |
| | |
| | | |
| | | private static final String DOMAINS_STATE_FILENAME = "domains.state"; |
| | | |
| | | static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state"; |
| | | |
| | | private static final String DOMAIN_STATE_SEPARATOR = ":"; |
| | | |
| | | private static final String DOMAIN_SUFFIX = ".domain"; |
| | |
| | | |
| | | private static final String GENERATION_ID_FILE_SUFFIX = ".id"; |
| | | |
| | | private static final String UTF8_ENCODING = "UTF-8"; |
| | | |
| | | private static final FileFilter DOMAIN_FILE_FILTER = new FileFilter() |
| | | { |
| | | @Override |
| | |
| | | private final Map<DN, String> domains = new HashMap<DN, String>(); |
| | | |
| | | /** Exclusive lock to guard the domains mapping and change of state to a domain.*/ |
| | | // TODO : review the usefulness of this lock |
| | | private final Object domainLock = new Object(); |
| | | |
| | | /** The underlying replication server. */ |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Notify that the replica corresponding to provided domain and provided CSN |
| | | * is offline. |
| | | * |
| | | * @param domainDN |
| | | * the domain of the offline replica |
| | | * @param offlineCSN |
| | | * the offline replica serverId and offline timestamp |
| | | * @throws ChangelogException |
| | | * if a problem occurs |
| | | */ |
| | | void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException |
| | | { |
| | | synchronized (domainLock) |
| | | { |
| | | final String domainId = domains.get(domainDN); |
| | | final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId()); |
| | | if (!serverIdPath.exists()) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH.get( |
| | | domainDN.toString(), offlineCSN.getServerId(), serverIdPath.getPath())); |
| | | } |
| | | final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME); |
| | | Writer writer = null; |
| | | try |
| | | { |
| | | // Overwrite file, only the last sent offline CSN is kept |
| | | writer = newFileWriter(offlineFile); |
| | | writer.write(offlineCSN.toString()); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE.get( |
| | | domainDN.toString(), offlineCSN.getServerId(), offlineFile.getPath(), offlineCSN.toString()), e); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writer); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Notify that the replica corresponding to provided domain and server id |
| | | * is online. |
| | | * |
| | | * @param domainDN |
| | | * the domain of the replica |
| | | * @param serverId |
| | | * the replica serverId |
| | | * @throws ChangelogException |
| | | * if a problem occurs |
| | | */ |
| | | void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException |
| | | { |
| | | synchronized (domainLock) |
| | | { |
| | | final String domainId = domains.get(domainDN); |
| | | final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME); |
| | | if (offlineFile.exists()) |
| | | { |
| | | final boolean isDeleted = offlineFile.delete(); |
| | | if (!isDeleted) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE.get( |
| | | offlineFile.getPath(), domainDN.toString(), serverId)); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** Reads the domain state file to find mapping between each domainDN and its associated domainId. */ |
| | | private void readDomainsStateFile() throws ChangelogException |
| | | { |
| | |
| | | String line = null; |
| | | try |
| | | { |
| | | reader = new BufferedReader(new InputStreamReader(new FileInputStream(domainsStateFile), "UTF-8")); |
| | | reader = newFileReader(domainsStateFile); |
| | | while ((line = reader.readLine()) != null) |
| | | { |
| | | final int separatorPos = line.indexOf(DOMAIN_STATE_SEPARATOR); |
| | |
| | | * Update the changelog state with the state corresponding to the provided |
| | | * domain DN. |
| | | */ |
| | | private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState result) |
| | | private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState state) |
| | | throws ChangelogException |
| | | { |
| | | final File domainDirectory = getDomainPath(domainEntry.getValue()); |
| | |
| | | replicationRootPath, domainDirectory.getPath())); |
| | | } |
| | | final DN domainDN = domainEntry.getKey(); |
| | | result.setDomainGenerationId(domainDN, toGenerationId(generationId)); |
| | | state.setDomainGenerationId(domainDN, toGenerationId(generationId)); |
| | | |
| | | final File[] serverIds = domainDirectory.listFiles(SERVER_ID_FILE_FILTER); |
| | | if (serverIds == null) |
| | |
| | | } |
| | | for (final File serverId : serverIds) |
| | | { |
| | | result.addServerIdToDomain(toServerId(serverId.getName()), domainDN); |
| | | readStateForServerId(domainDN, serverId, state); |
| | | } |
| | | } |
| | | |
| | | private void readStateForServerId(DN domainDN, File serverIdPath, ChangelogState state) throws ChangelogException |
| | | { |
| | | state.addServerIdToDomain(toServerId(serverIdPath.getName()), domainDN); |
| | | |
| | | final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME); |
| | | if (offlineFile.exists()) |
| | | { |
| | | final CSN offlineCSN = readOfflineStateFile(offlineFile, domainDN); |
| | | state.addOfflineReplica(domainDN, offlineCSN); |
| | | } |
| | | } |
| | | |
| | | private CSN readOfflineStateFile(final File offlineFile, DN domainDN) throws ChangelogException |
| | | { |
| | | BufferedReader reader = null; |
| | | try |
| | | { |
| | | reader = newFileReader(offlineFile); |
| | | String line = reader.readLine(); |
| | | if (line == null || reader.readLine() != null) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE.get( |
| | | domainDN.toString(), offlineFile.getPath())); |
| | | } |
| | | return new CSN(line); |
| | | } |
| | | catch(IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE.get( |
| | | domainDN.toString(), offlineFile.getPath()), e); |
| | | } |
| | | finally { |
| | | StaticUtils.close(reader); |
| | | } |
| | | } |
| | | |
| | |
| | | Writer writer = null; |
| | | try |
| | | { |
| | | writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(domainsStateFile), "UTF-8")); |
| | | writer = newFileWriter(domainsStateFile); |
| | | for (final Entry<DN, String> entry : domains.entrySet()) |
| | | { |
| | | writer.write(String.format("%s%s%s%n", entry.getValue(), DOMAIN_STATE_SEPARATOR, entry.getKey())); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE.get(nextDomainId, |
| | | domainDN.toString(), domainsStateFile.getPath()), e); |
| | |
| | | return new File(replicationRootPath, domainId + DOMAIN_SUFFIX); |
| | | } |
| | | |
| | | private File getServerIdPath(final String domainId, final int serverId) |
| | | /** |
| | | * Return the path for the provided domain id and server id. |
| | | * Package private to be usable in tests. |
| | | * |
| | | * @param domainId |
| | | * The id corresponding to a domain DN |
| | | * @param serverId |
| | | * The server id to retrieve |
| | | * @return the path |
| | | */ |
| | | File getServerIdPath(final String domainId, final int serverId) |
| | | { |
| | | return new File(getDomainPath(domainId), String.valueOf(serverId) + SERVER_ID_SUFFIX); |
| | | } |
| | |
| | | throw new ChangelogException(ERR_CHANGELOG_GENERATION_ID_WRONG_FORMAT.get(data), e); |
| | | } |
| | | } |
| | | |
| | | /** Returns a buffered writer on the provided file. */ |
| | | private BufferedWriter newFileWriter(final File file) throws UnsupportedEncodingException, FileNotFoundException |
| | | { |
| | | return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), UTF8_ENCODING)); |
| | | } |
| | | |
| | | /** Returns a buffered reader on the provided file. */ |
| | | private BufferedReader newFileReader(final File file) throws UnsupportedEncodingException, FileNotFoundException |
| | | { |
| | | return new BufferedReader(new InputStreamReader(new FileInputStream(file), UTF8_ENCODING)); |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Indicates if the replica corresponding to provided domain DN and server id |
| | | * is offline. |
| | | * |
| | | * @param domainDN |
| | | * base DN of the replica |
| | | * @param serverId |
| | | * server id of the replica |
| | | * @return {@code true} if replica is offline, {@code false} otherwise |
| | | */ |
| | | public boolean isReplicaOffline(DN domainDN, int serverId) |
| | | { |
| | | return replicasOffline.getCSN(domainDN, serverId) != null; |
| | | } |
| | | |
| | | /** |
| | | * Ensures the medium consistency point is updated by UpdateMsg. |
| | | * |
| | | * @param baseDN |
| | |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId()); |
| | | indexer.publishUpdateMsg(baseDN, updateMsg); |
| | | } |
| | | return wasCreated; |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN) |
| | | public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN) throws ChangelogException |
| | | { |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId()); |
| | | indexer.publishHeartbeat(baseDN, heartbeatCSN); |
| | | } |
| | | } |
| | | |
| | | private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId) |
| | | throws ChangelogException |
| | | { |
| | | if (indexer.isReplicaOffline(baseDN, serverId)) |
| | | { |
| | | dbEnv.notifyReplicaOnline(baseDN, serverId); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void replicaOffline(DN baseDN, CSN offlineCSN) |
| | | public void notifyReplicaOffline(DN baseDN, CSN offlineCSN) |
| | | throws ChangelogException |
| | | { |
| | | dbEnv.addOfflineReplica(baseDN, offlineCSN); |
| | | dbEnv.notifyReplicaOffline(baseDN, offlineCSN); |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | |
| | | */ |
| | | static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN) |
| | | { |
| | | final byte[] key = |
| | | toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId() |
| | | + FIELD_SEPARATOR + baseDN.toNormalizedString()); |
| | | final byte[] key = toReplicaOfflineKey(baseDN, offlineCSN.getServerId()); |
| | | final ByteStringBuilder data = new ByteStringBuilder(8); // store a long |
| | | data.append(offlineCSN.getTime()); |
| | | return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray()); |
| | | } |
| | | |
| | | /** |
| | | * Return the key for a replica offline entry in the changelog state database. |
| | | * |
| | | * @param baseDN |
| | | * the replica's baseDN |
| | | * @param serverId |
| | | * the replica's serverId |
| | | * @return the key used in the database to store offline time of the replica |
| | | */ |
| | | private static byte[] toReplicaOfflineKey(DN baseDN, int serverId) |
| | | { |
| | | return toBytes(OFFLINE_TAG + FIELD_SEPARATOR + serverId + FIELD_SEPARATOR + baseDN.toNormalizedString()); |
| | | } |
| | | |
| | | /** Returns an entry with the provided key and a null value. */ |
| | | private SimpleImmutableEntry<byte[], byte[]> toEntryWithNullValue(byte[] key) |
| | | { |
| | | return new SimpleImmutableEntry<byte[], byte[]>(key, null); |
| | | } |
| | | |
| | | private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry) |
| | | throws ChangelogException, RuntimeException |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Add the information about an offline replica to the changelog state DB. |
| | | * Notify that replica is offline. |
| | | * <p> |
| | | * This information is stored in the changelog state DB. |
| | | * |
| | | * @param baseDN |
| | | * the domain of the offline replica |
| | |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | public void addOfflineReplica(DN baseDN, CSN offlineCSN) |
| | | public void notifyReplicaOffline(DN baseDN, CSN offlineCSN) |
| | | throws ChangelogException |
| | | { |
| | | // just overwrite any older entry as it is assumed a newly received offline |
| | |
| | | "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")"); |
| | | } |
| | | |
| | | /** |
| | | * Notify that replica is online. |
| | | * <p> |
| | | * Update the changelog state DB if necessary (ie, replica was known to be |
| | | * offline). |
| | | * |
| | | * @param baseDN |
| | | * the domain of replica |
| | | * @param serverId |
| | | * the serverId of replica |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException |
| | | { |
| | | deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)), |
| | | "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")"); |
| | | } |
| | | |
| | | private void putInChangelogStateDB(Entry<byte[], byte[]> entry, |
| | | String methodInvocation) throws ChangelogException |
| | | { |
| | |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*; |
| | | |
| | | import java.io.Closeable; |
| | | import java.io.File; |
| | | import java.util.ArrayList; |
| | | import java.util.Arrays; |
| | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.CSNGenerator; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.testng.annotations.AfterMethod; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.Test; |
| | |
| | | @SuppressWarnings("javadoc") |
| | | public class ReplicationEnvironmentTest extends DirectoryServerTestCase |
| | | { |
| | | private static final int SERVER_ID_1 = 1; |
| | | private static final int SERVER_ID_2 = 2; |
| | | |
| | | private static final String DN1_AS_STRING = "cn=test1,dc=company.com"; |
| | | private static final String DN2_AS_STRING = "cn=te::st2,dc=company.com"; |
| | | private static final String DN3_AS_STRING = "cn=test3,dc=company.com"; |
| | |
| | | } |
| | | |
| | | @Test |
| | | public void testCreateThenReadChangelogStateWithSingleDN() throws Exception |
| | | public void testReadChangelogStateWithSingleDN() throws Exception |
| | | { |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = null; |
| | | Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null; |
| | | try |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.decode(DN1_AS_STRING); |
| | | |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB(); |
| | | Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1); |
| | | Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1); |
| | | StaticUtils.close(cnDB, replicaDB, replicaDB2); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1); |
| | | |
| | | ChangelogState state = environment.readChangelogState(); |
| | | |
| | | assertThat(state.getDomainToServerIds()).containsKeys(domainDN); |
| | | assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(1, 2); |
| | | assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1, SERVER_ID_2); |
| | | assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L)); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cnDB, replicaDB, replicaDB2); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCreateThenReadChangelogStateWithMultipleDN() throws Exception |
| | | public void testReadChangelogStateWithMultipleDN() throws Exception |
| | | { |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = null; |
| | | List<Log<CSN,UpdateMsg>> replicaDBs = new ArrayList<Log<CSN,UpdateMsg>>(); |
| | | try |
| | | { |
| | | File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | List<DN> domainDNs = Arrays.asList(DN.decode(DN1_AS_STRING), DN.decode(DN2_AS_STRING), DN.decode(DN3_AS_STRING)); |
| | | |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB(); |
| | | List<Log<CSN,UpdateMsg>> replicaDBs = new ArrayList<Log<CSN,UpdateMsg>>(); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | for (int i = 0; i <= 2 ; i++) |
| | | { |
| | | for (int j = 1; j <= 10; j++) |
| | | { |
| | | // 3 domains, 10 server id each, generation id is different for each domain |
| | | replicaDBs.add(environment.getOrCreateReplicaDB(domainDNs.get(i), j, i+1)); |
| | | } |
| | | } |
| | | StaticUtils.close(cnDB); |
| | | StaticUtils.close(replicaDBs.toArray(new Closeable[] {})); |
| | | |
| | | ChangelogState state = environment.readChangelogState(); |
| | | |
| | |
| | | MapEntry.entry(domainDNs.get(1), 2L), |
| | | MapEntry.entry(domainDNs.get(2), 3L)); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cnDB); |
| | | StaticUtils.close(replicaDBs); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testReadChangelogStateWithReplicaOffline() throws Exception |
| | | { |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = null; |
| | | Log<CSN,UpdateMsg> replicaDB = null; |
| | | try |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.decode(DN1_AS_STRING); |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | |
| | | // put server id 1 offline |
| | | CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1); |
| | | environment.notifyReplicaOffline(domainDN, offlineCSN); |
| | | |
| | | ChangelogState state = environment.readChangelogState(); |
| | | |
| | | assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN))); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cnDB, replicaDB); |
| | | } |
| | | } |
| | | |
| | | @Test(expectedExceptions=ChangelogException.class) |
| | | public void testReadChangelogStateWithReplicaOfflineStateFileCorrupted() throws Exception |
| | | { |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = null; |
| | | Log<CSN,UpdateMsg> replicaDB = null; |
| | | try |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.decode(DN1_AS_STRING); |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | |
| | | File offlineStateFile = new File(environment.getServerIdPath("1", 1), REPLICA_OFFLINE_STATE_FILENAME); |
| | | offlineStateFile.createNewFile(); |
| | | |
| | | environment.readChangelogState(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cnDB, replicaDB); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testReadChangelogStateWithReplicaOfflineSentTwice() throws Exception |
| | | { |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = null; |
| | | Log<CSN,UpdateMsg> replicaDB = null; |
| | | try |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.decode(DN1_AS_STRING); |
| | | |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | |
| | | // put server id 1 offline twice |
| | | CSNGenerator csnGenerator = new CSNGenerator(SERVER_ID_1, 100); |
| | | environment.notifyReplicaOffline(domainDN, csnGenerator.newCSN()); |
| | | CSN lastOfflineCSN = csnGenerator.newCSN(); |
| | | environment.notifyReplicaOffline(domainDN, lastOfflineCSN); |
| | | |
| | | ChangelogState state = environment.readChangelogState(); |
| | | |
| | | assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(lastOfflineCSN))); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cnDB, replicaDB); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testReadChangelogStateWithReplicaOfflineThenReplicaOnline() throws Exception |
| | | { |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = null; |
| | | Log<CSN,UpdateMsg> replicaDB = null; |
| | | try |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.decode(DN1_AS_STRING); |
| | | |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | |
| | | // put server id 1 offline |
| | | environment.notifyReplicaOffline(domainDN, new CSN(TimeThread.getTime(), 0, SERVER_ID_1)); |
| | | // put server id 1 online again |
| | | environment.notifyReplicaOnline(domainDN, SERVER_ID_1); |
| | | |
| | | ChangelogState state = environment.readChangelogState(); |
| | | |
| | | assertThat(state.getOfflineReplicas()).isEmpty(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cnDB, replicaDB); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCreateThenReadChangelogStateWithReplicaOffline() throws Exception |
| | | { |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = null; |
| | | Log<CSN,UpdateMsg> replicaDB = null; |
| | | try |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.decode(DN1_AS_STRING); |
| | | |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1); |
| | | environment.notifyReplicaOffline(domainDN, offlineCSN); |
| | | |
| | | ChangelogState state = environment.readChangelogState(); |
| | | |
| | | assertThat(state.getDomainToServerIds()).containsKeys(domainDN); |
| | | assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1); |
| | | assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L)); |
| | | assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN))); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cnDB, replicaDB); |
| | | } |
| | | } |
| | | |
| | | @Test(expectedExceptions=ChangelogException.class) |
| | | public void testMissingDomainDirectory() throws Exception |
| | | { |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = null; |
| | | Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null; |
| | | try |
| | | { |
| | | File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | DN domainDN = DN.decode(DN1_AS_STRING); |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1); |
| | | Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1); |
| | | StaticUtils.close(replicaDB, replicaDB2); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1); |
| | | |
| | | // delete the domain directory created for the 2 replica DBs to break the |
| | | // consistency with domain state file |
| | |
| | | |
| | | environment.readChangelogState(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cnDB, replicaDB, replicaDB2); |
| | | } |
| | | } |
| | | } |