OPENDJ-1453 Replica offline messages should be synced with updates
In r10840, the change to JE/FileChangelogDB.getCursorFrom(DN, ServerState) unnecessarily triggers a lot of calls to the underlying DB (JE or file based) to retrieve the ChangelogState.
As an optimization, keeping an in-memory version of the ChangelogState in synch with the on-disk version will help.
ChangelogState.java:
Now thread safe.
Added removeOfflineReplica(), isEqualTo().
Changed domainToServerIds from Map<DN, List<Integer>> to Map<DN, Set<Integer>>.
In getOfflineReplicas(), now return a MultiDomainServerState.
ChangeNumberIndexer.java:
Consequence of the changes to ChangelogState.
ReplicationDbEnv.java:
Added changelogState field, updated at the same time as the on-disk changelogstate DB
Added getChangelogState(), called by client code instead of readChangelogState().
Renamed readChangelogState() to private readOnDiskChangelogState().
Added stateLock field to sync updates to in-memory and on-disk changelog state.
ReplicationEnvironment.java:
Added changelogState field, updated at the same time as the on-disk changelogstate DB
Added getChangelogState(), called by client code instead of readChangelogState().
Renamed readChangelogState() to private readOnDiskChangelogState().
Renamed domainLock field to domainsLock.
replication.properties:
Removed now unused error message.
FileChangelogDB.java, JEChangelogDB.java:
Consequence of the changes to ChangelogState and ReplicationEnvironment/ReplicationDbEnv.
MultiDomainServerState.java, ServerState.java:
Added getSnapshot() for unit tests.
ReplicationEnvironmentTest.java, ReplicationDbEnvTest.java:
Consequence of the changes to ReplicationEnvironment and ChangelogState.
Used the fake server.
| | |
| | | change %s to replicaDB %s %s because: %s |
| | | SEVERE_ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB_240=Could not add \ |
| | | change %s to replicaDB %s %s because flushing thread is shutting down |
| | | SEVERE_ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH_241=Error when retrieving changelog \ |
| | | state from root path '%s' : directory might not exist |
| | | SEVERE_ERR_CHANGELOG_READ_STATE_CANT_READ_DOMAIN_DIRECTORY_243=Error when retrieving \ |
| | | changelog state from root path '%s' : IO error on domain directory '%s' when retrieving \ |
| | | list of server ids |
| | |
| | | */ |
| | | package org.opends.server.replication.common; |
| | | |
| | | import java.util.Collections; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.TreeMap; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns a snapshot of this object. |
| | | * |
| | | * @return an unmodifiable Map representing a snapshot of this object. |
| | | */ |
| | | public Map<DN, List<CSN>> getSnapshot() |
| | | { |
| | | if (list.isEmpty()) |
| | | { |
| | | return Collections.emptyMap(); |
| | | } |
| | | final Map<DN, List<CSN>> map = new HashMap<DN, List<CSN>>(); |
| | | for (Entry<DN, ServerState> entry : list.entrySet()) |
| | | { |
| | | final List<CSN> l = entry.getValue().getSnapshot(); |
| | | if (!l.isEmpty()) |
| | | { |
| | | map.put(entry.getKey(), l); |
| | | } |
| | | } |
| | | return Collections.unmodifiableMap(map); |
| | | } |
| | | |
| | | /** |
| | | * Returns a string representation of this object. |
| | | * |
| | | * @return The string representation. |
| | | */ |
| | | @Override |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns a snapshot of this object. |
| | | * |
| | | * @return an unmodifiable List representing a snapshot of this object. |
| | | */ |
| | | public List<CSN> getSnapshot() |
| | | { |
| | | if (serverIdToCSN.isEmpty()) |
| | | { |
| | | return Collections.emptyList(); |
| | | } |
| | | return Collections.unmodifiableList(new ArrayList<CSN>(serverIdToCSN.values())); |
| | | } |
| | | |
| | | /** |
| | | * Return the text representation of ServerState. |
| | | * @return the text representation of ServerState |
| | | */ |
| | |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.HashSet; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | |
| | | * <p> |
| | | * This class is used during replication initialization to decouple the code |
| | | * that reads the changelogStateDB from the code that makes use of its data. |
| | | * |
| | | * @ThreadSafe |
| | | */ |
| | | public class ChangelogState |
| | | { |
| | | |
| | | private final Map<DN, Long> domainToGenerationId = new HashMap<DN, Long>(); |
| | | private final Map<DN, List<Integer>> domainToServerIds = |
| | | new HashMap<DN, List<Integer>>(); |
| | | private final Map<DN, List<CSN>> offlineReplicas = |
| | | new HashMap<DN, List<CSN>>(); |
| | | private final ConcurrentSkipListMap<DN, Long> domainToGenerationId = new ConcurrentSkipListMap<DN, Long>(); |
| | | private final ConcurrentSkipListMap<DN, Set<Integer>> domainToServerIds = |
| | | new ConcurrentSkipListMap<DN, Set<Integer>>(); |
| | | private final MultiDomainServerState offlineReplicas = new MultiDomainServerState(); |
| | | |
| | | /** |
| | | * Sets the generationId for the supplied replication domain. |
| | |
| | | */ |
| | | public void addServerIdToDomain(int serverId, DN baseDN) |
| | | { |
| | | List<Integer> serverIds = domainToServerIds.get(baseDN); |
| | | Set<Integer> serverIds = domainToServerIds.get(baseDN); |
| | | if (serverIds == null) |
| | | { |
| | | serverIds = new LinkedList<Integer>(); |
| | | domainToServerIds.put(baseDN, serverIds); |
| | | serverIds = new HashSet<Integer>(); |
| | | final Set<Integer> existingServerIds = |
| | | domainToServerIds.putIfAbsent(baseDN, serverIds); |
| | | if (existingServerIds != null) |
| | | { |
| | | serverIds = existingServerIds; |
| | | } |
| | | } |
| | | serverIds.add(serverId); |
| | | } |
| | |
| | | */ |
| | | public void addOfflineReplica(DN baseDN, CSN offlineCSN) |
| | | { |
| | | List<CSN> offlineCSNs = offlineReplicas.get(baseDN); |
| | | if (offlineCSNs == null) |
| | | offlineReplicas.update(baseDN, offlineCSN); |
| | | } |
| | | |
| | | /** |
| | | * Removes the following replica information from the offline list. |
| | | * |
| | | * @param baseDN |
| | | * the baseDN of the offline replica |
| | | * @param serverId |
| | | * the serverId that is not offline anymore |
| | | */ |
| | | public void removeOfflineReplica(DN baseDN, int serverId) |
| | | { |
| | | CSN csn; |
| | | do |
| | | { |
| | | offlineCSNs = new LinkedList<CSN>(); |
| | | offlineReplicas.put(baseDN, offlineCSNs); |
| | | csn = offlineReplicas.getCSN(baseDN, serverId); |
| | | } |
| | | offlineCSNs.add(offlineCSN); |
| | | while (csn != null && !offlineReplicas.removeCSN(baseDN, csn)); |
| | | } |
| | | |
| | | /** |
| | |
| | | * |
| | | * @return a Map of domainBaseDN => List<serverId>. |
| | | */ |
| | | public Map<DN, List<Integer>> getDomainToServerIds() |
| | | public Map<DN, Set<Integer>> getDomainToServerIds() |
| | | { |
| | | return domainToServerIds; |
| | | } |
| | | |
| | | /** |
| | | * Returns the Map of domainBaseDN => List<offlineCSN>. |
| | | * Returns the internal MultiDomainServerState for offline replicas. |
| | | * |
| | | * @return a Map of domainBaseDN => List<offlineCSN>. |
| | | * @return the MultiDomainServerState for offline replicas. |
| | | */ |
| | | public Map<DN, List<CSN>> getOfflineReplicas() |
| | | public MultiDomainServerState getOfflineReplicas() |
| | | { |
| | | return offlineReplicas; |
| | | } |
| | | |
| | | /** |
| | | * Returns whether the current ChangelogState is equal to the provided |
| | | * ChangelogState. |
| | | * <p> |
| | | * Note: Only use for tests!!<br> |
| | | * This method should only be used by tests because it creates a lot of |
| | | * intermediate objects which is not suitable for production. |
| | | * |
| | | * @param other |
| | | * the ChangelogState to compare with |
| | | * @return true if the current ChangelogState is equal to the provided |
| | | * ChangelogState, false otherwise. |
| | | */ |
| | | public boolean isEqualTo(ChangelogState other) |
| | | { |
| | | if (other == null) |
| | | { |
| | | return false; |
| | | } |
| | | if (this == other) |
| | | { |
| | | return true; |
| | | } |
| | | return domainToGenerationId.equals(other.domainToGenerationId) |
| | | && domainToServerIds.equals(other.domainToServerIds) |
| | | // Note: next line is not suitable for production |
| | | // because it creates lots of Lists and Maps |
| | | && offlineReplicas.getSnapshot().equals(other.offlineReplicas.getSnapshot()); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | |
| | | { |
| | | final File dbDir = getFileForPath(config.getReplicationDBDirectory()); |
| | | replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer); |
| | | final ChangelogState changelogState = replicationEnv.readChangelogState(); |
| | | final ChangelogState changelogState = replicationEnv.getChangelogState(); |
| | | initializeToChangelogState(changelogState); |
| | | if (config.isComputeChangeNumber()) |
| | | { |
| | |
| | | { |
| | | replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue()); |
| | | } |
| | | for (Map.Entry<DN, List<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | { |
| | | for (int serverId : entry.getValue()) |
| | | { |
| | |
| | | { |
| | | if (computeChangeNumber) |
| | | { |
| | | startIndexer(replicationEnv.readChangelogState()); |
| | | startIndexer(replicationEnv.getChangelogState()); |
| | | } |
| | | else |
| | | { |
| | |
| | | throws ChangelogException |
| | | { |
| | | final Set<Integer> serverIds = getDomainMap(baseDN).keySet(); |
| | | final ChangelogState state = replicationEnv.readChangelogState(); |
| | | final MultiDomainServerState offlineReplicas = replicationEnv.getChangelogState().getOfflineReplicas(); |
| | | final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size()); |
| | | for (int serverId : serverIds) |
| | | { |
| | |
| | | final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; |
| | | final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); |
| | | replicaDBCursor.next(); |
| | | final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState); |
| | | final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState); |
| | | cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null); |
| | | } |
| | | // recycle exhausted cursors, |
| | |
| | | return new CompositeDBCursor<Void>(cursors, true); |
| | | } |
| | | |
| | | private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId, |
| | | private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId, |
| | | ServerState startAfterServerState) |
| | | { |
| | | final List<CSN> domain = state.getOfflineReplicas().get(baseDN); |
| | | if (domain != null) |
| | | final ServerState domainState = offlineReplicas.getServerState(baseDN); |
| | | if (domainState != null) |
| | | { |
| | | for (CSN offlineCSN : domain) |
| | | for (CSN offlineCSN : domainState) |
| | | { |
| | | if (serverId == offlineCSN.getServerId() |
| | | && !startAfterServerState.cover(offlineCSN)) |
| | |
| | | |
| | | /** Root path where the replication log is stored. */ |
| | | private final String replicationRootPath; |
| | | /** |
| | | * The current changelogState. This is in-memory version of what is inside the |
| | | * on-disk changelogStateDB. It improves performances in case the |
| | | * changelogState is read often. |
| | | * |
| | | * @GuardedBy("domainsLock") |
| | | */ |
| | | private final ChangelogState changelogState; |
| | | |
| | | /** The list of logs that are in use. */ |
| | | private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>(); |
| | | |
| | | /** Maps each domain DN to a domain id that is used to name directory in file system. */ |
| | | /** |
| | | * Maps each domain DN to a domain id that is used to name directory in file system. |
| | | * |
| | | * @GuardedBy("domainsLock") |
| | | */ |
| | | private final Map<DN, String> domains = new HashMap<DN, String>(); |
| | | |
| | | /** Exclusive lock to guard the domains mapping and change of state to a domain.*/ |
| | | private final Object domainLock = new Object(); |
| | | /** |
| | | * Exclusive lock to synchronize: |
| | | * <ul> |
| | | * <li>the domains mapping</li> |
| | | * <li>changes to the in-memory changelogState</li> |
| | | * <li>changes to the on-disk state of a domain</li> |
| | | */ |
| | | private final Object domainsLock = new Object(); |
| | | |
| | | /** The underlying replication server. */ |
| | | private final ReplicationServer replicationServer; |
| | |
| | | { |
| | | this.replicationRootPath = rootPath; |
| | | this.replicationServer = replicationServer; |
| | | this.changelogState = readOnDiskChangelogState(); |
| | | } |
| | | |
| | | /** |
| | | * Returns the state of the replication changelog, which includes the list of |
| | | * known servers and the generation id. |
| | | * Returns the state of the replication changelog. |
| | | * |
| | | * @return the {@link ChangelogState} |
| | | * @return the {@link ChangelogState} read from the changelogState DB |
| | | * @throws ChangelogException |
| | | * if a problem occurs while retrieving the state. |
| | | * if a database problem occurs |
| | | */ |
| | | ChangelogState readChangelogState() throws ChangelogException |
| | | ChangelogState readOnDiskChangelogState() throws ChangelogException |
| | | { |
| | | final ChangelogState state = new ChangelogState(); |
| | | final File changelogPath = new File(replicationRootPath); |
| | | synchronized (domainLock) |
| | | synchronized (domainsLock) |
| | | { |
| | | readDomainsStateFile(); |
| | | checkDomainDirectories(changelogPath); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns the current state of the replication changelog. |
| | | * |
| | | * @return the current {@link ChangelogState} |
| | | */ |
| | | ChangelogState getChangelogState() |
| | | { |
| | | return changelogState; |
| | | } |
| | | |
| | | /** |
| | | * Finds or creates the log used to store changes from the replication server |
| | | * with the given serverId and the given baseDN. |
| | | * |
| | |
| | | ensureRootDirectoryExists(); |
| | | |
| | | String domainId = null; |
| | | synchronized (domainLock) |
| | | synchronized (domainsLock) |
| | | { |
| | | domainId = domains.get(domainDN); |
| | | if (domainId == null) |
| | |
| | | |
| | | final File serverIdPath = getServerIdPath(domainId, serverId); |
| | | ensureServerIdDirectoryExists(serverIdPath); |
| | | changelogState.addServerIdToDomain(serverId, domainDN); |
| | | |
| | | final File generationIdPath = getGenerationIdPath(domainId, generationId); |
| | | ensureGenerationIdFileExists(generationIdPath); |
| | | changelogState.setDomainGenerationId(domainDN, generationId); |
| | | |
| | | return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER); |
| | | } |
| | |
| | | */ |
| | | void clearGenerationId(final DN domainDN) throws ChangelogException |
| | | { |
| | | synchronized (domainLock) |
| | | synchronized (domainsLock) |
| | | { |
| | | final String domainId = domains.get(domainDN); |
| | | if (domainId == null) |
| | | { |
| | | return; // unknow domain => no-op |
| | | return; // unknown domain => no-op |
| | | } |
| | | final File idFile = retrieveGenerationIdFile(getDomainPath(domainId)); |
| | | if (idFile != null) |
| | |
| | | ERR_CHANGELOG_UNABLE_TO_DELETE_GENERATION_ID_FILE.get(idFile.getPath(), domainDN.toString())); |
| | | } |
| | | } |
| | | changelogState.setDomainGenerationId(domainDN, NO_GENERATION_ID); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | void resetGenerationId(final DN baseDN) throws ChangelogException |
| | | { |
| | | synchronized (domainLock) |
| | | synchronized (domainsLock) |
| | | { |
| | | clearGenerationId(baseDN); |
| | | final String domainId = domains.get(baseDN); |
| | | if (domainId == null) |
| | | { |
| | | return; // unknow domain => no-op |
| | | return; // unknown domain => no-op |
| | | } |
| | | final File generationIdPath = getGenerationIdPath(domainId, NO_GENERATION_ID); |
| | | ensureGenerationIdFileExists(generationIdPath); |
| | | changelogState.setDomainGenerationId(baseDN, NO_GENERATION_ID); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException |
| | | { |
| | | synchronized (domainLock) |
| | | synchronized (domainsLock) |
| | | { |
| | | final String domainId = domains.get(domainDN); |
| | | if (domainId == null) |
| | | { |
| | | return; // unknow domain => no-op |
| | | return; // unknown domain => no-op |
| | | } |
| | | final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId()); |
| | | if (!serverIdPath.exists()) |
| | |
| | | // Overwrite file, only the last sent offline CSN is kept |
| | | writer = newFileWriter(offlineFile); |
| | | writer.write(offlineCSN.toString()); |
| | | changelogState.addOfflineReplica(domainDN, offlineCSN); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | |
| | | */ |
| | | void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException |
| | | { |
| | | synchronized (domainLock) |
| | | synchronized (domainsLock) |
| | | { |
| | | final String domainId = domains.get(domainDN); |
| | | if (domainId == null) |
| | | { |
| | | return; // unknow domain => no-op |
| | | return; // unknown domain => no-op |
| | | } |
| | | final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME); |
| | | if (offlineFile.exists()) |
| | |
| | | offlineFile.getPath(), domainDN.toString(), serverId)); |
| | | } |
| | | } |
| | | changelogState.removeOfflineReplica(domainDN, serverId); |
| | | } |
| | | } |
| | | |
| | |
| | | private void checkDomainDirectories(final File changelogPath) throws ChangelogException |
| | | { |
| | | final File[] dnDirectories = changelogPath.listFiles(DOMAIN_FILE_FILTER); |
| | | if (dnDirectories == null) |
| | | if (dnDirectories != null) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH.get(replicationRootPath)); |
| | | } |
| | | final Set<String> domainIdsFromFileSystem = new HashSet<String>(); |
| | | for (final File dnDir : dnDirectories) |
| | | { |
| | | final String fileName = dnDir.getName(); |
| | | final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length()); |
| | | domainIdsFromFileSystem.add(domainId); |
| | | } |
| | | |
| | | Set<String> domainIdsFromFileSystem = new HashSet<String>(); |
| | | for (final File dnDir : dnDirectories) |
| | | { |
| | | final String fileName = dnDir.getName(); |
| | | final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length()); |
| | | domainIdsFromFileSystem.add(domainId); |
| | | } |
| | | |
| | | Set<String> expectedDomainIds = new HashSet<String>(domains.values()); |
| | | if (!domainIdsFromFileSystem.equals(expectedDomainIds)) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(), |
| | | domainIdsFromFileSystem.toString())); |
| | | final Set<String> expectedDomainIds = new HashSet<String>(domains.values()); |
| | | if (!domainIdsFromFileSystem.equals(expectedDomainIds)) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(), |
| | | domainIdsFromFileSystem.toString())); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | import java.util.Comparator; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.concurrent.ConcurrentSkipListSet; |
| | |
| | | // initialize the DB cursor and the last seen updates |
| | | // to ensure the medium consistency CSN can move forward |
| | | final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); |
| | | for (Entry<DN, List<Integer>> entry |
| | | : changelogState.getDomainToServerIds().entrySet()) |
| | | for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | { |
| | | final DN baseDN = entry.getKey(); |
| | | if (!isECLEnabledDomain(baseDN)) |
| | |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | |
| | | for (Entry<DN, List<CSN>> entry : changelogState.getOfflineReplicas() |
| | | .entrySet()) |
| | | final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas(); |
| | | for (DN baseDN : offlineReplicas) |
| | | { |
| | | final DN baseDN = entry.getKey(); |
| | | final List<CSN> offlineCSNs = entry.getValue(); |
| | | for (CSN offlineCSN : offlineCSNs) |
| | | for (CSN offlineCSN : offlineReplicas.getServerState(baseDN)) |
| | | { |
| | | if (isECLEnabledDomain(baseDN)) |
| | | { |
| | |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | |
| | | { |
| | | final File dbDir = getFileForPath(config.getReplicationDBDirectory()); |
| | | dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer); |
| | | final ChangelogState changelogState = dbEnv.readChangelogState(); |
| | | final ChangelogState changelogState = dbEnv.getChangelogState(); |
| | | initializeToChangelogState(changelogState); |
| | | if (config.isComputeChangeNumber()) |
| | | { |
| | |
| | | { |
| | | replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue()); |
| | | } |
| | | for (Map.Entry<DN, List<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | { |
| | | for (int serverId : entry.getValue()) |
| | | { |
| | |
| | | { |
| | | if (computeChangeNumber) |
| | | { |
| | | startIndexer(dbEnv.readChangelogState()); |
| | | startIndexer(dbEnv.getChangelogState()); |
| | | } |
| | | else |
| | | { |
| | |
| | | throws ChangelogException |
| | | { |
| | | final Set<Integer> serverIds = getDomainMap(baseDN).keySet(); |
| | | final ChangelogState state = dbEnv.readChangelogState(); |
| | | final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas(); |
| | | final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size()); |
| | | for (int serverId : serverIds) |
| | | { |
| | |
| | | final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; |
| | | final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); |
| | | replicaDBCursor.next(); |
| | | final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState); |
| | | final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState); |
| | | cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null); |
| | | } |
| | | // recycle exhausted cursors, |
| | |
| | | return new CompositeDBCursor<Void>(cursors, true); |
| | | } |
| | | |
| | | private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId, |
| | | private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId, |
| | | ServerState startAfterServerState) |
| | | { |
| | | final List<CSN> domain = state.getOfflineReplicas().get(baseDN); |
| | | if (domain != null) |
| | | final ServerState domainState = offlineReplicas.getServerState(baseDN); |
| | | if (domainState != null) |
| | | { |
| | | for (CSN offlineCSN : domain) |
| | | for (CSN offlineCSN : domainState) |
| | | { |
| | | if (serverId == offlineCSN.getServerId() |
| | | && !startAfterServerState.cover(offlineCSN)) |
| | |
| | | { |
| | | private Environment dbEnvironment; |
| | | private Database changelogStateDb; |
| | | /** |
| | | * The current changelogState. This is in-memory version of what is inside the |
| | | * on-disk changelogStateDB. It improves performances in case the |
| | | * changelogState is read often. |
| | | * |
| | | * @GuardedBy("stateLock") |
| | | */ |
| | | private final ChangelogState changelogState; |
| | | /** Exclusive lock to synchronize updates to in-memory and on-disk changelogState */ |
| | | private final Object stateLock = new Object(); |
| | | private final List<Database> allDbs = new CopyOnWriteArrayList<Database>(); |
| | | private ReplicationServer replicationServer; |
| | | private final AtomicBoolean isShuttingDown = new AtomicBoolean(false); |
| | |
| | | * of all the servers that have been seen in the past. |
| | | */ |
| | | changelogStateDb = openDatabase("changelogstate"); |
| | | changelogState = readOnDiskChangelogState(); |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Read and return the list of known servers from the database. |
| | | * Return the current changelog state. |
| | | * |
| | | * @return the current {@link ChangelogState} |
| | | */ |
| | | public ChangelogState getChangelogState() |
| | | { |
| | | return changelogState; |
| | | } |
| | | |
| | | /** |
| | | * Read and return the changelog state from the database. |
| | | * |
| | | * @return the {@link ChangelogState} read from the changelogState DB |
| | | * @throws ChangelogException |
| | | * if a database problem occurs |
| | | */ |
| | | public ChangelogState readChangelogState() throws ChangelogException |
| | | protected ChangelogState readOnDiskChangelogState() throws ChangelogException |
| | | { |
| | | return decodeChangelogState(readWholeState()); |
| | | } |
| | |
| | | // Opens the DB for the changes received from this server on this domain. |
| | | final Database replicaDB = openDatabase(replicaEntry.getKey()); |
| | | |
| | | putInChangelogStateDBIfNotExist(toByteArray(replicaEntry)); |
| | | putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId)); |
| | | synchronized (stateLock) |
| | | { |
| | | putInChangelogStateDBIfNotExist(toByteArray(replicaEntry)); |
| | | changelogState.addServerIdToDomain(serverId, baseDN); |
| | | putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId)); |
| | | changelogState.setDomainGenerationId(baseDN, generationId); |
| | | } |
| | | return replicaDB; |
| | | } |
| | | catch (RuntimeException e) |
| | |
| | | */ |
| | | public void clearGenerationId(DN baseDN) throws ChangelogException |
| | | { |
| | | final int unusedGenId = 0; |
| | | deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId), |
| | | "clearGenerationId(baseDN=" + baseDN + ")"); |
| | | synchronized (stateLock) |
| | | { |
| | | final int unusedGenId = 0; |
| | | deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId), |
| | | "clearGenerationId(baseDN=" + baseDN + ")"); |
| | | changelogState.setDomainGenerationId(baseDN, unusedGenId); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void clearServerId(DN baseDN, int serverId) throws ChangelogException |
| | | { |
| | | deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)), |
| | | "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")"); |
| | | synchronized (stateLock) |
| | | { |
| | | deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)), |
| | | "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")"); |
| | | changelogState.setDomainGenerationId(baseDN, -1); |
| | | } |
| | | } |
| | | |
| | | private void deleteFromChangelogStateDB(Entry<byte[], ?> entry, |
| | |
| | | public void notifyReplicaOffline(DN baseDN, CSN offlineCSN) |
| | | throws ChangelogException |
| | | { |
| | | // just overwrite any older entry as it is assumed a newly received offline |
| | | // CSN is newer than the previous one |
| | | putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN), |
| | | "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")"); |
| | | synchronized (stateLock) |
| | | { |
| | | // just overwrite any older entry as it is assumed a newly received offline |
| | | // CSN is newer than the previous one |
| | | putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN), |
| | | "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")"); |
| | | changelogState.addOfflineReplica(baseDN, offlineCSN); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException |
| | | { |
| | | deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)), |
| | | "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")"); |
| | | synchronized (stateLock) |
| | | { |
| | | deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)), |
| | | "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")"); |
| | | changelogState.removeOfflineReplica(baseDN, serverId); |
| | | } |
| | | } |
| | | |
| | | private void putInChangelogStateDB(Entry<byte[], byte[]> entry, |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*; |
| | | |
| | | import java.io.File; |
| | | import java.util.ArrayList; |
| | | import java.util.Arrays; |
| | |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.testng.annotations.AfterClass; |
| | | import org.testng.annotations.AfterMethod; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | | public class ReplicationEnvironmentTest extends DirectoryServerTestCase |
| | | { |
| | |
| | | public void setUp() throws Exception |
| | | { |
| | | // This test suite depends on having the schema available for DN decoding. |
| | | TestCaseUtils.startServer(); |
| | | TestCaseUtils.startFakeServer(); |
| | | } |
| | | |
| | | @AfterClass |
| | | public void tearDown() throws Exception |
| | | { |
| | | TestCaseUtils.shutdownFakeServer(); |
| | | } |
| | | |
| | | @AfterMethod |
| | |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1); |
| | | |
| | | ChangelogState state = environment.readChangelogState(); |
| | | final ChangelogState state = environment.readOnDiskChangelogState(); |
| | | |
| | | assertThat(state.getDomainToServerIds()).containsKeys(domainDN); |
| | | assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1, SERVER_ID_2); |
| | | assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L)); |
| | | |
| | | assertThat(state).isEqualTo(environment.getChangelogState()); |
| | | } |
| | | finally |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | ChangelogState state = environment.readChangelogState(); |
| | | final ChangelogState state = environment.readOnDiskChangelogState(); |
| | | |
| | | assertThat(state.getDomainToServerIds()).containsKeys(domainDNs.get(0), domainDNs.get(1), domainDNs.get(2)); |
| | | for (int i = 0; i <= 2 ; i++) |
| | |
| | | MapEntry.entry(domainDNs.get(0), 1L), |
| | | MapEntry.entry(domainDNs.get(1), 2L), |
| | | MapEntry.entry(domainDNs.get(2), 3L)); |
| | | |
| | | assertThat(state).isEqualTo(environment.getChangelogState()); |
| | | } |
| | | finally |
| | | { |
| | |
| | | CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1); |
| | | environment.notifyReplicaOffline(domainDN, offlineCSN); |
| | | |
| | | ChangelogState state = environment.readChangelogState(); |
| | | final ChangelogState state = environment.readOnDiskChangelogState(); |
| | | |
| | | assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN))); |
| | | assertThat(state.getOfflineReplicas().getSnapshot()) |
| | | .containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN))); |
| | | |
| | | assertThat(state).isEqualTo(environment.getChangelogState()); |
| | | } |
| | | finally |
| | | { |
| | |
| | | File offlineStateFile = new File(environment.getServerIdPath("1", 1), REPLICA_OFFLINE_STATE_FILENAME); |
| | | offlineStateFile.createNewFile(); |
| | | |
| | | environment.readChangelogState(); |
| | | environment.readOnDiskChangelogState(); |
| | | } |
| | | finally |
| | | { |
| | |
| | | CSN lastOfflineCSN = csnGenerator.newCSN(); |
| | | environment.notifyReplicaOffline(domainDN, lastOfflineCSN); |
| | | |
| | | ChangelogState state = environment.readChangelogState(); |
| | | |
| | | assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(lastOfflineCSN))); |
| | | final ChangelogState state = environment.readOnDiskChangelogState(); |
| | | assertThat(state.getOfflineReplicas().getSnapshot()) |
| | | .containsExactly(MapEntry.entry(domainDN, Arrays.asList(lastOfflineCSN))); |
| | | assertThat(state).isEqualTo(environment.getChangelogState()); |
| | | } |
| | | finally |
| | | { |
| | |
| | | // put server id 1 online again |
| | | environment.notifyReplicaOnline(domainDN, SERVER_ID_1); |
| | | |
| | | ChangelogState state = environment.readChangelogState(); |
| | | |
| | | final ChangelogState state = environment.readOnDiskChangelogState(); |
| | | assertThat(state.getOfflineReplicas()).isEmpty(); |
| | | assertThat(state).isEqualTo(environment.getChangelogState()); |
| | | } |
| | | finally |
| | | { |
| | |
| | | CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1); |
| | | environment.notifyReplicaOffline(domainDN, offlineCSN); |
| | | |
| | | ChangelogState state = environment.readChangelogState(); |
| | | final ChangelogState state = environment.readOnDiskChangelogState(); |
| | | |
| | | assertThat(state.getDomainToServerIds()).containsKeys(domainDN); |
| | | assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1); |
| | | assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L)); |
| | | assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN))); |
| | | assertThat(state.getOfflineReplicas().getSnapshot()) |
| | | .containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN))); |
| | | |
| | | assertThat(state).isEqualTo(environment.getChangelogState()); |
| | | } |
| | | finally |
| | | { |
| | |
| | | // consistency with domain state file |
| | | StaticUtils.recursiveDelete(new File(rootPath, "1.domain")); |
| | | |
| | | environment.readChangelogState(); |
| | | environment.readOnDiskChangelogState(); |
| | | } |
| | | finally |
| | | { |
| | |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.HashSet; |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | |
| | | } |
| | | |
| | | @Override |
| | | protected Database openDatabase(String databaseName) |
| | | throws ChangelogException, RuntimeException |
| | | protected Database openDatabase(String databaseName) throws ChangelogException, RuntimeException |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | @Override |
| | | protected ChangelogState readOnDiskChangelogState() throws ChangelogException |
| | | { |
| | | return new ChangelogState(); |
| | | } |
| | | } |
| | | |
| | | @BeforeClass |
| | |
| | | entry(baseDN, generationId)); |
| | | if (!replicas.isEmpty()) |
| | | { |
| | | assertThat(state.getDomainToServerIds()).containsExactly( |
| | | entry(baseDN, replicas)); |
| | | assertThat(state.getDomainToServerIds()) |
| | | .containsExactly(entry(baseDN, new HashSet<Integer>(replicas))); |
| | | } |
| | | else |
| | | { |
| | |
| | | } |
| | | if (!offlineReplicas.isEmpty()) |
| | | { |
| | | assertThat(state.getOfflineReplicas()).containsExactly( |
| | | entry(baseDN, offlineReplicas)); |
| | | assertThat(state.getOfflineReplicas().getSnapshot()) |
| | | .containsExactly(entry(baseDN, offlineReplicas)); |
| | | } |
| | | else |
| | | { |