opends/src/messages/messages/replication.properties
@@ -572,8 +572,8 @@ to file system for log file '%s' SEVERE_ERR_CHANGELOG_UNABLE_TO_SEEK_260=Could not seek to position %d for reader \ on log file '%s' SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root directory '%s' for \ log file SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root \ directory '%s' for log file SEVERE_ERR_CHANGELOG_UNABLE_TO_DECODE_DN_FROM_DOMAIN_STATE_FILE_262=Could not decode DN \ from domain state file '%s', from line '%s' SEVERE_ERR_CHANGELOG_UNABLE_TO_READ_DOMAIN_STATE_FILE_263=Could not read domain state \ @@ -604,4 +604,14 @@ SEVERE_ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE_275=Could not rename \ head log file from '%s' to '%s' INFO_CHANGELOG_LOG_FILE_ROTATION_276=Rotation needed for log file '%s', \ size of head log file is %d bytes size of head log file is %d bytes SEVERE_ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH_277=Could not add replica \ offline for domain %s and server id %d because the path '%s' does not exist SEVERE_ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE_278=Could not write offline \ replica information for domain %s and server id %d, using path '%s' (offline CSN is %s) SEVERE_ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE_279=Could not read replica offline \ state file '%s' for domain %s, it should contain exactly one line corresponding to the offline CSN SEVERE_ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE_280=Could not read content of \ replica offline state file '%s' for domain %s SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE_281=Could not delete replica \ offline state file '%s' for domain %s and server id %d opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2507,7 +2507,7 @@ { if (msg.isReplicaOfflineMsg()) { domainDB.replicaOffline(baseDN, msg.getCSN()); domainDB.notifyReplicaOffline(baseDN, msg.getCSN()); } else { opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -166,8 +166,10 @@ * @param heartbeatCSN * The CSN heartbeat sent by this replica (contains the serverId and * timestamp of the heartbeat) * @throws ChangelogException * If a database problem happened */ void replicaHeartbeat(DN baseDN, CSN heartbeatCSN); void replicaHeartbeat(DN baseDN, CSN heartbeatCSN) throws ChangelogException; /** * Let the DB know this replica is going down. @@ -186,5 +188,5 @@ * @throws ChangelogException * If a database problem happened */ void replicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException; void notifyReplicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException; } opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -672,6 +672,7 @@ final ChangeNumberIndexer indexer = cnIndexer.get(); if (indexer != null) { notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId()); indexer.publishUpdateMsg(baseDN, updateMsg); } return wasCreated; @@ -679,25 +680,35 @@ /** {@inheritDoc} */ @Override public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException { final ChangeNumberIndexer indexer = cnIndexer.get(); if (indexer != null) { notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId()); indexer.publishHeartbeat(baseDN, heartbeatCSN); } } private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId) throws ChangelogException { if (indexer.isReplicaOffline(baseDN, serverId)) { replicationEnv.notifyReplicaOnline(baseDN, serverId); } } /** {@inheritDoc} */ @Override public void replicaOffline(final DN baseDN, final CSN offlineCSN) public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException { replicationEnv.notifyReplicaOffline(baseDN, offlineCSN); final ChangeNumberIndexer indexer = cnIndexer.get(); if (indexer != null) { indexer.replicaOffline(baseDN, offlineCSN); } // TODO save this state in the changelogStateDB? } /** opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -158,7 +158,7 @@ /** * The last key appended to the log. In order to keep the ordering of the keys * in the log, any attempt to append a record with a key lower or equal to * this key will silently fail. * this is rejected (no error but an event is logged). */ private K lastAppendedKey; @@ -342,6 +342,10 @@ /** * Add the provided record at the end of this log. * <p> * The record must have a key strictly higher than the key * of the last record added. If it is not the case, the record is not * appended and the method returns immediately. * <p> * In order to ensure that record is written out of buffers and persisted * to file system, it is necessary to explicitely call the * {@code syncToFileSystem()} method. @@ -349,7 +353,7 @@ * @param record * The record to add. * @throws ChangelogException * If the record can't be added to the log. * If an error occurs while adding the record to the log. */ public void append(final Record<K, V> record) throws ChangelogException { @@ -766,7 +770,7 @@ catch (IOException e) { throw new ChangelogException( ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(HEAD_LOG_FILE_NAME, rotatedLogFile.getPath()), e); ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(headLogFile.getPath(), rotatedLogFile.getPath()), e); } } @@ -842,8 +846,8 @@ private void openHeadLogFile() throws ChangelogException { final LogFile<K, V> head = LogFile.newAppendableLogFile(new File(logPath, HEAD_LOG_FILE_NAME), recordParser); Record<K,V> newestRecord = head.getNewestRecord(); lastAppendedKey = newestRecord == null ? null : newestRecord.getKey(); final Record<K,V> newestRecord = head.getNewestRecord(); lastAppendedKey = newestRecord != null ? newestRecord.getKey() : null; logFiles.put(recordParser.getMaxKey(), head); } @@ -897,7 +901,7 @@ /** * Represents a cursor than can be repositioned on a given key. */ static interface RepositionableCursor<K extends Comparable<K>,V> extends DBCursor<Record<K, V>> static interface RepositionableCursor<K extends Comparable<K>, V> extends DBCursor<Record<K, V>> { /** * Position the cursor to the record corresponding to the provided key or to @@ -1011,9 +1015,9 @@ if (key != null) { boolean isFound = currentCursor.positionTo(key, findNearest); if (isFound && getRecord() == null) if (isFound && getRecord() == null && !log.isHeadLogFile(currentLogFile)) { // The key to position to may be in the next file, force the switch // The key to position is probably in the next file, force the switch isFound = next(); } return isFound; @@ -1047,6 +1051,13 @@ currentLogFile = logFile; currentCursor = currentLogFile.getCursor(); } /** {@inheritDoc} */ public String toString() { return String.format("Cursor on log : %s, current log file: %s, current cursor: %s", log.logPath, currentLogFile.getFile().getName(), currentCursor); } } /** An empty cursor, that always return null records and false to {@code next()} method. */ opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -30,10 +30,12 @@ import java.io.File; import java.io.FileFilter; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.UnsupportedEncodingException; import java.io.Writer; import java.util.HashMap; import java.util.HashSet; @@ -77,15 +79,21 @@ * <ul> * <li>A "generation_[id].id" file, where [id] is the generation id</li> * <li>One directory per server id, named after "[id].server" where [id] is the * id of the server. Each directory contains the log files for the given server * id.</li> * id of the server.</li> * </ul> * All log files end with the ".log" suffix. Log files always include the "head.log" * file and optionally zero to many read-only log files named after the lowest key * and highest key present in the log file. * Each server id directory contains the following files : * <ul> * <li>The "head.log" file, which is the more recent log file where records are appended.</li> * <li>Zero to many read-only log files named after the lowest key * and highest key present in the log file (they all end with the ".log" suffix.</li> * <li>Optionally, a "offline.state" file that indicates that this particular server id * of the domain is offline. This file contains the offline CSN, encoded as a String on a single line.</li> * </ul> * See {@code Log} class for details on the log files. * * <p> * Layout example with two domains "o=test1" and "o=test2", each having server * ids 22 and 33 : * ids 22 and 33, with server id 33 for domain "o=test1" being offline : * * <pre> * +---changelog @@ -96,15 +104,16 @@ * | \---1.domain * | \---generation1.id * | \---22.server * | \---head.log * | \---head.log [contains last records written] * | \---33.server * | \---head.log * | \---head.log [contains last records written] * \---offline.state * | \---2.domain * | \---generation1.id * | \---22.server * | \---head.log * | \---head.log [contains last records written] * | \---33.server * | \---head.log * | \---head.log [contains last records written] * </pre> */ class ReplicationEnvironment @@ -120,6 +129,8 @@ private static final String DOMAINS_STATE_FILENAME = "domains.state"; static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state"; private static final String DOMAIN_STATE_SEPARATOR = ":"; private static final String DOMAIN_SUFFIX = ".domain"; @@ -130,6 +141,8 @@ private static final String GENERATION_ID_FILE_SUFFIX = ".id"; private static final String UTF8_ENCODING = "UTF-8"; private static final FileFilter DOMAIN_FILE_FILTER = new FileFilter() { @Override @@ -169,7 +182,6 @@ private final Map<DN, String> domains = new HashMap<DN, String>(); /** Exclusive lock to guard the domains mapping and change of state to a domain.*/ // TODO : review the usefulness of this lock private final Object domainLock = new Object(); /** The underlying replication server. */ @@ -358,6 +370,77 @@ } } /** * Notify that the replica corresponding to provided domain and provided CSN * is offline. * * @param domainDN * the domain of the offline replica * @param offlineCSN * the offline replica serverId and offline timestamp * @throws ChangelogException * if a problem occurs */ void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException { synchronized (domainLock) { final String domainId = domains.get(domainDN); final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId()); if (!serverIdPath.exists()) { throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH.get( domainDN.toString(), offlineCSN.getServerId(), serverIdPath.getPath())); } final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME); Writer writer = null; try { // Overwrite file, only the last sent offline CSN is kept writer = newFileWriter(offlineFile); writer.write(offlineCSN.toString()); } catch (IOException e) { throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE.get( domainDN.toString(), offlineCSN.getServerId(), offlineFile.getPath(), offlineCSN.toString()), e); } finally { StaticUtils.close(writer); } } } /** * Notify that the replica corresponding to provided domain and server id * is online. * * @param domainDN * the domain of the replica * @param serverId * the replica serverId * @throws ChangelogException * if a problem occurs */ void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException { synchronized (domainLock) { final String domainId = domains.get(domainDN); final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME); if (offlineFile.exists()) { final boolean isDeleted = offlineFile.delete(); if (!isDeleted) { throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE.get( offlineFile.getPath(), domainDN.toString(), serverId)); } } } } /** Reads the domain state file to find mapping between each domainDN and its associated domainId. */ private void readDomainsStateFile() throws ChangelogException { @@ -368,7 +451,7 @@ String line = null; try { reader = new BufferedReader(new InputStreamReader(new FileInputStream(domainsStateFile), "UTF-8")); reader = newFileReader(domainsStateFile); while ((line = reader.readLine()) != null) { final int separatorPos = line.indexOf(DOMAIN_STATE_SEPARATOR); @@ -425,7 +508,7 @@ * Update the changelog state with the state corresponding to the provided * domain DN. */ private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState result) private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState state) throws ChangelogException { final File domainDirectory = getDomainPath(domainEntry.getValue()); @@ -436,7 +519,7 @@ replicationRootPath, domainDirectory.getPath())); } final DN domainDN = domainEntry.getKey(); result.setDomainGenerationId(domainDN, toGenerationId(generationId)); state.setDomainGenerationId(domainDN, toGenerationId(generationId)); final File[] serverIds = domainDirectory.listFiles(SERVER_ID_FILE_FILTER); if (serverIds == null) @@ -446,7 +529,43 @@ } for (final File serverId : serverIds) { result.addServerIdToDomain(toServerId(serverId.getName()), domainDN); readStateForServerId(domainDN, serverId, state); } } private void readStateForServerId(DN domainDN, File serverIdPath, ChangelogState state) throws ChangelogException { state.addServerIdToDomain(toServerId(serverIdPath.getName()), domainDN); final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME); if (offlineFile.exists()) { final CSN offlineCSN = readOfflineStateFile(offlineFile, domainDN); state.addOfflineReplica(domainDN, offlineCSN); } } private CSN readOfflineStateFile(final File offlineFile, DN domainDN) throws ChangelogException { BufferedReader reader = null; try { reader = newFileReader(offlineFile); String line = reader.readLine(); if (line == null || reader.readLine() != null) { throw new ChangelogException(ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE.get( domainDN.toString(), offlineFile.getPath())); } return new CSN(line); } catch(IOException e) { throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE.get( domainDN.toString(), offlineFile.getPath()), e); } finally { StaticUtils.close(reader); } } @@ -458,13 +577,13 @@ Writer writer = null; try { writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(domainsStateFile), "UTF-8")); writer = newFileWriter(domainsStateFile); for (final Entry<DN, String> entry : domains.entrySet()) { writer.write(String.format("%s%s%s%n", entry.getValue(), DOMAIN_STATE_SEPARATOR, entry.getKey())); } } catch (Exception e) catch (IOException e) { throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE.get(nextDomainId, domainDN.toString(), domainsStateFile.getPath()), e); @@ -561,7 +680,17 @@ return new File(replicationRootPath, domainId + DOMAIN_SUFFIX); } private File getServerIdPath(final String domainId, final int serverId) /** * Return the path for the provided domain id and server id. * Package private to be usable in tests. * * @param domainId * The id corresponding to a domain DN * @param serverId * The server id to retrieve * @return the path */ File getServerIdPath(final String domainId, final int serverId) { return new File(getDomainPath(domainId), String.valueOf(serverId) + SERVER_ID_SUFFIX); } @@ -673,4 +802,16 @@ throw new ChangelogException(ERR_CHANGELOG_GENERATION_ID_WRONG_FORMAT.get(data), e); } } /** Returns a buffered writer on the provided file. */ private BufferedWriter newFileWriter(final File file) throws UnsupportedEncodingException, FileNotFoundException { return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), UTF8_ENCODING)); } /** Returns a buffered reader on the provided file. */ private BufferedReader newFileReader(final File file) throws UnsupportedEncodingException, FileNotFoundException { return new BufferedReader(new InputStreamReader(new FileInputStream(file), UTF8_ENCODING)); } } opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -187,6 +187,21 @@ } /** * Indicates if the replica corresponding to provided domain DN and server id * is offline. * * @param domainDN * base DN of the replica * @param serverId * server id of the replica * @return {@code true} if replica is offline, {@code false} otherwise */ public boolean isReplicaOffline(DN domainDN, int serverId) { return replicasOffline.getCSN(domainDN, serverId) != null; } /** * Ensures the medium consistency point is updated by UpdateMsg. * * @param baseDN opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -756,6 +756,7 @@ final ChangeNumberIndexer indexer = cnIndexer.get(); if (indexer != null) { notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId()); indexer.publishUpdateMsg(baseDN, updateMsg); } return wasCreated; @@ -763,21 +764,31 @@ /** {@inheritDoc} */ @Override public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN) public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN) throws ChangelogException { final ChangeNumberIndexer indexer = cnIndexer.get(); if (indexer != null) { notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId()); indexer.publishHeartbeat(baseDN, heartbeatCSN); } } private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId) throws ChangelogException { if (indexer.isReplicaOffline(baseDN, serverId)) { dbEnv.notifyReplicaOnline(baseDN, serverId); } } /** {@inheritDoc} */ @Override public void replicaOffline(DN baseDN, CSN offlineCSN) public void notifyReplicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException { dbEnv.addOfflineReplica(baseDN, offlineCSN); dbEnv.notifyReplicaOffline(baseDN, offlineCSN); final ChangeNumberIndexer indexer = cnIndexer.get(); if (indexer != null) { opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -505,14 +505,32 @@ */ static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN) { final byte[] key = toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId() + FIELD_SEPARATOR + baseDN.toNormalizedString()); final byte[] key = toReplicaOfflineKey(baseDN, offlineCSN.getServerId()); final ByteStringBuilder data = new ByteStringBuilder(8); // store a long data.append(offlineCSN.getTime()); return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray()); } /** * Return the key for a replica offline entry in the changelog state database. * * @param baseDN * the replica's baseDN * @param serverId * the replica's serverId * @return the key used in the database to store offline time of the replica */ private static byte[] toReplicaOfflineKey(DN baseDN, int serverId) { return toBytes(OFFLINE_TAG + FIELD_SEPARATOR + serverId + FIELD_SEPARATOR + baseDN.toNormalizedString()); } /** Returns an entry with the provided key and a null value. */ private SimpleImmutableEntry<byte[], byte[]> toEntryWithNullValue(byte[] key) { return new SimpleImmutableEntry<byte[], byte[]>(key, null); } private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry) throws ChangelogException, RuntimeException { @@ -689,7 +707,9 @@ } /** * Add the information about an offline replica to the changelog state DB. * Notify that replica is offline. * <p> * This information is stored in the changelog state DB. * * @param baseDN * the domain of the offline replica @@ -698,7 +718,7 @@ * @throws ChangelogException * if a database problem occurred */ public void addOfflineReplica(DN baseDN, CSN offlineCSN) public void notifyReplicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException { // just overwrite any older entry as it is assumed a newly received offline @@ -707,6 +727,25 @@ "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")"); } /** * Notify that replica is online. * <p> * Update the changelog state DB if necessary (ie, replica was known to be * offline). * * @param baseDN * the domain of replica * @param serverId * the serverId of replica * @throws ChangelogException * if a database problem occurred */ public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException { deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)), "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")"); } private void putInChangelogStateDB(Entry<byte[], byte[]> entry, String methodInvocation) throws ChangelogException { opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
@@ -26,8 +26,8 @@ package org.opends.server.replication.server.changelog.file; import static org.assertj.core.api.Assertions.*; import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*; import java.io.Closeable; import java.io.File; import java.util.ArrayList; import java.util.Arrays; @@ -37,12 +37,14 @@ import org.opends.server.DirectoryServerTestCase; import org.opends.server.TestCaseUtils; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.CSNGenerator; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ChangelogState; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.types.DN; import org.opends.server.util.StaticUtils; import org.opends.server.util.TimeThread; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -50,6 +52,9 @@ @SuppressWarnings("javadoc") public class ReplicationEnvironmentTest extends DirectoryServerTestCase { private static final int SERVER_ID_1 = 1; private static final int SERVER_ID_2 = 2; private static final String DN1_AS_STRING = "cn=test1,dc=company.com"; private static final String DN2_AS_STRING = "cn=te::st2,dc=company.com"; private static final String DN3_AS_STRING = "cn=test3,dc=company.com"; @@ -74,70 +79,231 @@ } @Test public void testCreateThenReadChangelogStateWithSingleDN() throws Exception public void testReadChangelogStateWithSingleDN() throws Exception { final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); final DN domainDN = DN.decode(DN1_AS_STRING); Log<Long,ChangeNumberIndexRecord> cnDB = null; Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null; try { final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); final DN domainDN = DN.decode(DN1_AS_STRING); ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); cnDB = environment.getOrCreateCNIndexDB(); replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1); ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB(); Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1); Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1); StaticUtils.close(cnDB, replicaDB, replicaDB2); ChangelogState state = environment.readChangelogState(); ChangelogState state = environment.readChangelogState(); assertThat(state.getDomainToServerIds()).containsKeys(domainDN); assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(1, 2); assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L)); assertThat(state.getDomainToServerIds()).containsKeys(domainDN); assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1, SERVER_ID_2); assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L)); } finally { StaticUtils.close(cnDB, replicaDB, replicaDB2); } } @Test public void testCreateThenReadChangelogStateWithMultipleDN() throws Exception public void testReadChangelogStateWithMultipleDN() throws Exception { File rootPath = new File(TEST_DIRECTORY_CHANGELOG); List<DN> domainDNs = Arrays.asList(DN.decode(DN1_AS_STRING), DN.decode(DN2_AS_STRING), DN.decode(DN3_AS_STRING)); ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB(); Log<Long,ChangeNumberIndexRecord> cnDB = null; List<Log<CSN,UpdateMsg>> replicaDBs = new ArrayList<Log<CSN,UpdateMsg>>(); for (int i = 0; i <= 2 ; i++) try { for (int j = 1; j <= 10; j++) File rootPath = new File(TEST_DIRECTORY_CHANGELOG); List<DN> domainDNs = Arrays.asList(DN.decode(DN1_AS_STRING), DN.decode(DN2_AS_STRING), DN.decode(DN3_AS_STRING)); ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); cnDB = environment.getOrCreateCNIndexDB(); for (int i = 0; i <= 2 ; i++) { replicaDBs.add(environment.getOrCreateReplicaDB(domainDNs.get(i), j, i+1)); for (int j = 1; j <= 10; j++) { // 3 domains, 10 server id each, generation id is different for each domain replicaDBs.add(environment.getOrCreateReplicaDB(domainDNs.get(i), j, i+1)); } } ChangelogState state = environment.readChangelogState(); assertThat(state.getDomainToServerIds()).containsKeys(domainDNs.get(0), domainDNs.get(1), domainDNs.get(2)); for (int i = 0; i <= 2 ; i++) { assertThat(state.getDomainToServerIds().get(domainDNs.get(i))).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); } assertThat(state.getDomainToGenerationId()).containsOnly( MapEntry.entry(domainDNs.get(0), 1L), MapEntry.entry(domainDNs.get(1), 2L), MapEntry.entry(domainDNs.get(2), 3L)); } StaticUtils.close(cnDB); StaticUtils.close(replicaDBs.toArray(new Closeable[] {})); ChangelogState state = environment.readChangelogState(); assertThat(state.getDomainToServerIds()).containsKeys(domainDNs.get(0), domainDNs.get(1), domainDNs.get(2)); for (int i = 0; i <= 2 ; i++) finally { assertThat(state.getDomainToServerIds().get(domainDNs.get(i))).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); StaticUtils.close(cnDB); StaticUtils.close(replicaDBs); } assertThat(state.getDomainToGenerationId()).containsOnly( MapEntry.entry(domainDNs.get(0), 1L), MapEntry.entry(domainDNs.get(1), 2L), MapEntry.entry(domainDNs.get(2), 3L)); } @Test public void testReadChangelogStateWithReplicaOffline() throws Exception { Log<Long,ChangeNumberIndexRecord> cnDB = null; Log<CSN,UpdateMsg> replicaDB = null; try { final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); final DN domainDN = DN.decode(DN1_AS_STRING); ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); cnDB = environment.getOrCreateCNIndexDB(); replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); // put server id 1 offline CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1); environment.notifyReplicaOffline(domainDN, offlineCSN); ChangelogState state = environment.readChangelogState(); assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN))); } finally { StaticUtils.close(cnDB, replicaDB); } } @Test(expectedExceptions=ChangelogException.class) public void testReadChangelogStateWithReplicaOfflineStateFileCorrupted() throws Exception { Log<Long,ChangeNumberIndexRecord> cnDB = null; Log<CSN,UpdateMsg> replicaDB = null; try { final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); final DN domainDN = DN.decode(DN1_AS_STRING); ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); cnDB = environment.getOrCreateCNIndexDB(); replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); File offlineStateFile = new File(environment.getServerIdPath("1", 1), REPLICA_OFFLINE_STATE_FILENAME); offlineStateFile.createNewFile(); environment.readChangelogState(); } finally { StaticUtils.close(cnDB, replicaDB); } } @Test public void testReadChangelogStateWithReplicaOfflineSentTwice() throws Exception { Log<Long,ChangeNumberIndexRecord> cnDB = null; Log<CSN,UpdateMsg> replicaDB = null; try { final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); final DN domainDN = DN.decode(DN1_AS_STRING); ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); cnDB = environment.getOrCreateCNIndexDB(); replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); // put server id 1 offline twice CSNGenerator csnGenerator = new CSNGenerator(SERVER_ID_1, 100); environment.notifyReplicaOffline(domainDN, csnGenerator.newCSN()); CSN lastOfflineCSN = csnGenerator.newCSN(); environment.notifyReplicaOffline(domainDN, lastOfflineCSN); ChangelogState state = environment.readChangelogState(); assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(lastOfflineCSN))); } finally { StaticUtils.close(cnDB, replicaDB); } } @Test public void testReadChangelogStateWithReplicaOfflineThenReplicaOnline() throws Exception { Log<Long,ChangeNumberIndexRecord> cnDB = null; Log<CSN,UpdateMsg> replicaDB = null; try { final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); final DN domainDN = DN.decode(DN1_AS_STRING); ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); cnDB = environment.getOrCreateCNIndexDB(); replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); // put server id 1 offline environment.notifyReplicaOffline(domainDN, new CSN(TimeThread.getTime(), 0, SERVER_ID_1)); // put server id 1 online again environment.notifyReplicaOnline(domainDN, SERVER_ID_1); ChangelogState state = environment.readChangelogState(); assertThat(state.getOfflineReplicas()).isEmpty(); } finally { StaticUtils.close(cnDB, replicaDB); } } @Test public void testCreateThenReadChangelogStateWithReplicaOffline() throws Exception { Log<Long,ChangeNumberIndexRecord> cnDB = null; Log<CSN,UpdateMsg> replicaDB = null; try { final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); final DN domainDN = DN.decode(DN1_AS_STRING); ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); cnDB = environment.getOrCreateCNIndexDB(); replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1); environment.notifyReplicaOffline(domainDN, offlineCSN); ChangelogState state = environment.readChangelogState(); assertThat(state.getDomainToServerIds()).containsKeys(domainDN); assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1); assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L)); assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN))); } finally { StaticUtils.close(cnDB, replicaDB); } } @Test(expectedExceptions=ChangelogException.class) public void testMissingDomainDirectory() throws Exception { File rootPath = new File(TEST_DIRECTORY_CHANGELOG); DN domainDN = DN.decode(DN1_AS_STRING); ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1); Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1); StaticUtils.close(replicaDB, replicaDB2); Log<Long,ChangeNumberIndexRecord> cnDB = null; Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null; try { File rootPath = new File(TEST_DIRECTORY_CHANGELOG); DN domainDN = DN.decode(DN1_AS_STRING); ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1); // delete the domain directory created for the 2 replica DBs to break the // consistency with domain state file StaticUtils.recursiveDelete(new File(rootPath, "1.domain")); // delete the domain directory created for the 2 replica DBs to break the // consistency with domain state file StaticUtils.recursiveDelete(new File(rootPath, "1.domain")); environment.readChangelogState(); environment.readChangelogState(); } finally { StaticUtils.close(cnDB, replicaDB, replicaDB2); } } }