| | |
| | | */ |
| | | package org.opends.server.replication.common; |
| | | |
| | | import java.util.Collections; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.TreeMap; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns a snapshot of this object. |
| | | * |
| | | * @return an unmodifiable Map representing a snapshot of this object. |
| | | */ |
| | | public Map<DN, List<CSN>> getSnapshot() |
| | | { |
| | | if (list.isEmpty()) |
| | | { |
| | | return Collections.emptyMap(); |
| | | } |
| | | final Map<DN, List<CSN>> map = new HashMap<DN, List<CSN>>(); |
| | | for (Entry<DN, ServerState> entry : list.entrySet()) |
| | | { |
| | | final List<CSN> l = entry.getValue().getSnapshot(); |
| | | if (!l.isEmpty()) |
| | | { |
| | | map.put(entry.getKey(), l); |
| | | } |
| | | } |
| | | return Collections.unmodifiableMap(map); |
| | | } |
| | | |
| | | /** |
| | | * Returns a string representation of this object. |
| | | * |
| | | * @return The string representation. |
| | | */ |
| | | @Override |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns a snapshot of this object. |
| | | * |
| | | * @return an unmodifiable List representing a snapshot of this object. |
| | | */ |
| | | public List<CSN> getSnapshot() |
| | | { |
| | | if (serverIdToCSN.isEmpty()) |
| | | { |
| | | return Collections.emptyList(); |
| | | } |
| | | return Collections.unmodifiableList(new ArrayList<CSN>(serverIdToCSN.values())); |
| | | } |
| | | |
| | | /** |
| | | * Return the text representation of ServerState. |
| | | * @return the text representation of ServerState |
| | | */ |
| | |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.HashSet; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | |
| | | * <p> |
| | | * This class is used during replication initialization to decouple the code |
| | | * that reads the changelogStateDB from the code that makes use of its data. |
| | | * |
| | | * @ThreadSafe |
| | | */ |
| | | public class ChangelogState |
| | | { |
| | | |
| | | private final Map<DN, Long> domainToGenerationId = new HashMap<DN, Long>(); |
| | | private final Map<DN, List<Integer>> domainToServerIds = |
| | | new HashMap<DN, List<Integer>>(); |
| | | private final Map<DN, List<CSN>> offlineReplicas = |
| | | new HashMap<DN, List<CSN>>(); |
| | | private final ConcurrentSkipListMap<DN, Long> domainToGenerationId = new ConcurrentSkipListMap<DN, Long>(); |
| | | private final ConcurrentSkipListMap<DN, Set<Integer>> domainToServerIds = |
| | | new ConcurrentSkipListMap<DN, Set<Integer>>(); |
| | | private final MultiDomainServerState offlineReplicas = new MultiDomainServerState(); |
| | | |
| | | /** |
| | | * Sets the generationId for the supplied replication domain. |
| | |
| | | */ |
| | | public void addServerIdToDomain(int serverId, DN baseDN) |
| | | { |
| | | List<Integer> serverIds = domainToServerIds.get(baseDN); |
| | | Set<Integer> serverIds = domainToServerIds.get(baseDN); |
| | | if (serverIds == null) |
| | | { |
| | | serverIds = new LinkedList<Integer>(); |
| | | domainToServerIds.put(baseDN, serverIds); |
| | | serverIds = new HashSet<Integer>(); |
| | | final Set<Integer> existingServerIds = |
| | | domainToServerIds.putIfAbsent(baseDN, serverIds); |
| | | if (existingServerIds != null) |
| | | { |
| | | serverIds = existingServerIds; |
| | | } |
| | | } |
| | | serverIds.add(serverId); |
| | | } |
| | |
| | | */ |
| | | public void addOfflineReplica(DN baseDN, CSN offlineCSN) |
| | | { |
| | | List<CSN> offlineCSNs = offlineReplicas.get(baseDN); |
| | | if (offlineCSNs == null) |
| | | offlineReplicas.update(baseDN, offlineCSN); |
| | | } |
| | | |
| | | /** |
| | | * Removes the following replica information from the offline list. |
| | | * |
| | | * @param baseDN |
| | | * the baseDN of the offline replica |
| | | * @param serverId |
| | | * the serverId that is not offline anymore |
| | | */ |
| | | public void removeOfflineReplica(DN baseDN, int serverId) |
| | | { |
| | | CSN csn; |
| | | do |
| | | { |
| | | offlineCSNs = new LinkedList<CSN>(); |
| | | offlineReplicas.put(baseDN, offlineCSNs); |
| | | csn = offlineReplicas.getCSN(baseDN, serverId); |
| | | } |
| | | offlineCSNs.add(offlineCSN); |
| | | while (csn != null && !offlineReplicas.removeCSN(baseDN, csn)); |
| | | } |
| | | |
| | | /** |
| | |
| | | * |
| | | * @return a Map of domainBaseDN => List<serverId>. |
| | | */ |
| | | public Map<DN, List<Integer>> getDomainToServerIds() |
| | | public Map<DN, Set<Integer>> getDomainToServerIds() |
| | | { |
| | | return domainToServerIds; |
| | | } |
| | | |
| | | /** |
| | | * Returns the Map of domainBaseDN => List<offlineCSN>. |
| | | * Returns the internal MultiDomainServerState for offline replicas. |
| | | * |
| | | * @return a Map of domainBaseDN => List<offlineCSN>. |
| | | * @return the MultiDomainServerState for offline replicas. |
| | | */ |
| | | public Map<DN, List<CSN>> getOfflineReplicas() |
| | | public MultiDomainServerState getOfflineReplicas() |
| | | { |
| | | return offlineReplicas; |
| | | } |
| | | |
| | | /** |
| | | * Returns whether the current ChangelogState is equal to the provided |
| | | * ChangelogState. |
| | | * <p> |
| | | * Note: Only use for tests!!<br> |
| | | * This method should only be used by tests because it creates a lot of |
| | | * intermediate objects which is not suitable for production. |
| | | * |
| | | * @param other |
| | | * the ChangelogState to compare with |
| | | * @return true if the current ChangelogState is equal to the provided |
| | | * ChangelogState, false otherwise. |
| | | */ |
| | | public boolean isEqualTo(ChangelogState other) |
| | | { |
| | | if (other == null) |
| | | { |
| | | return false; |
| | | } |
| | | if (this == other) |
| | | { |
| | | return true; |
| | | } |
| | | return domainToGenerationId.equals(other.domainToGenerationId) |
| | | && domainToServerIds.equals(other.domainToServerIds) |
| | | // Note: next line is not suitable for production |
| | | // because it creates lots of Lists and Maps |
| | | && offlineReplicas.getSnapshot().equals(other.offlineReplicas.getSnapshot()); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.*; |
| | | import java.util.Collections; |
| | | import java.util.Comparator; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.concurrent.ConcurrentSkipListSet; |
| | |
| | | // initialize the DB cursor and the last seen updates |
| | | // to ensure the medium consistency CSN can move forward |
| | | final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); |
| | | for (Entry<DN, List<Integer>> entry |
| | | : changelogState.getDomainToServerIds().entrySet()) |
| | | for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | { |
| | | final DN baseDN = entry.getKey(); |
| | | if (!isECLEnabledDomain(baseDN)) |
| | |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | |
| | | for (Entry<DN, List<CSN>> entry : changelogState.getOfflineReplicas() |
| | | .entrySet()) |
| | | final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas(); |
| | | for (DN baseDN : offlineReplicas) |
| | | { |
| | | final DN baseDN = entry.getKey(); |
| | | final List<CSN> offlineCSNs = entry.getValue(); |
| | | for (CSN offlineCSN : offlineCSNs) |
| | | for (CSN offlineCSN : offlineReplicas.getServerState(baseDN)) |
| | | { |
| | | if (isECLEnabledDomain(baseDN)) |
| | | { |
| | |
| | | import org.opends.server.admin.std.server.ReplicationServerCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | |
| | | { |
| | | final File dbDir = getFileForPath(config.getReplicationDBDirectory()); |
| | | dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer); |
| | | final ChangelogState changelogState = dbEnv.readChangelogState(); |
| | | final ChangelogState changelogState = dbEnv.getChangelogState(); |
| | | initializeToChangelogState(changelogState); |
| | | if (config.isComputeChangeNumber()) |
| | | { |
| | |
| | | { |
| | | replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue()); |
| | | } |
| | | for (Map.Entry<DN, List<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | { |
| | | for (int serverId : entry.getValue()) |
| | | { |
| | |
| | | { |
| | | if (computeChangeNumber) |
| | | { |
| | | startIndexer(dbEnv.readChangelogState()); |
| | | startIndexer(dbEnv.getChangelogState()); |
| | | } |
| | | else |
| | | { |
| | |
| | | throws ChangelogException |
| | | { |
| | | final Set<Integer> serverIds = getDomainMap(baseDN).keySet(); |
| | | final ChangelogState state = dbEnv.readChangelogState(); |
| | | final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas(); |
| | | final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size()); |
| | | for (int serverId : serverIds) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; |
| | | final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); |
| | | final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState); |
| | | final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState); |
| | | cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null); |
| | | } |
| | | // recycle exhausted cursors, |
| | |
| | | return new CompositeDBCursor<Void>(cursors, true); |
| | | } |
| | | |
| | | private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId, |
| | | private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId, |
| | | ServerState startAfterServerState) |
| | | { |
| | | final List<CSN> domain = state.getOfflineReplicas().get(baseDN); |
| | | if (domain != null) |
| | | final ServerState domainState = offlineReplicas.getServerState(baseDN); |
| | | if (domainState != null) |
| | | { |
| | | for (CSN offlineCSN : domain) |
| | | for (CSN offlineCSN : domainState) |
| | | { |
| | | if (serverId == offlineCSN.getServerId() |
| | | && !startAfterServerState.cover(offlineCSN)) |
| | |
| | | { |
| | | private Environment dbEnvironment; |
| | | private Database changelogStateDb; |
| | | /** |
| | | * The current changelogState. This is in-memory version of what is inside the |
| | | * on-disk changelogStateDB. It improves performances in case the |
| | | * changelogState is read often. |
| | | * |
| | | * @GuardedBy("stateLock") |
| | | */ |
| | | private final ChangelogState changelogState; |
| | | /** Exclusive lock to synchronize updates to in-memory and on-disk changelogState */ |
| | | private final Object stateLock = new Object(); |
| | | private final List<Database> allDbs = new CopyOnWriteArrayList<Database>(); |
| | | private ReplicationServer replicationServer; |
| | | private final AtomicBoolean isShuttingDown = new AtomicBoolean(false); |
| | |
| | | * of all the servers that have been seen in the past. |
| | | */ |
| | | changelogStateDb = openDatabase("changelogstate"); |
| | | changelogState = readOnDiskChangelogState(); |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Read and return the list of known servers from the database. |
| | | * Return the current changelog state. |
| | | * |
| | | * @return the current {@link ChangelogState} |
| | | */ |
| | | public ChangelogState getChangelogState() |
| | | { |
| | | return changelogState; |
| | | } |
| | | |
| | | /** |
| | | * Read and return the changelog state from the database. |
| | | * |
| | | * @return the {@link ChangelogState} read from the changelogState DB |
| | | * @throws ChangelogException |
| | | * if a database problem occurs |
| | | */ |
| | | public ChangelogState readChangelogState() throws ChangelogException |
| | | protected ChangelogState readOnDiskChangelogState() throws ChangelogException |
| | | { |
| | | return decodeChangelogState(readWholeState()); |
| | | } |
| | |
| | | // Opens the DB for the changes received from this server on this domain. |
| | | final Database replicaDB = openDatabase(replicaEntry.getKey()); |
| | | |
| | | putInChangelogStateDBIfNotExist(toByteArray(replicaEntry)); |
| | | putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId)); |
| | | synchronized (stateLock) |
| | | { |
| | | putInChangelogStateDBIfNotExist(toByteArray(replicaEntry)); |
| | | changelogState.addServerIdToDomain(serverId, baseDN); |
| | | putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId)); |
| | | changelogState.setDomainGenerationId(baseDN, generationId); |
| | | } |
| | | return replicaDB; |
| | | } |
| | | catch (RuntimeException e) |
| | |
| | | */ |
| | | public void clearGenerationId(DN baseDN) throws ChangelogException |
| | | { |
| | | final int unusedGenId = 0; |
| | | deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId), |
| | | "clearGenerationId(baseDN=" + baseDN + ")"); |
| | | synchronized (stateLock) |
| | | { |
| | | final int unusedGenId = 0; |
| | | deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId), |
| | | "clearGenerationId(baseDN=" + baseDN + ")"); |
| | | changelogState.setDomainGenerationId(baseDN, unusedGenId); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void clearServerId(DN baseDN, int serverId) throws ChangelogException |
| | | { |
| | | deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)), |
| | | "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")"); |
| | | synchronized (stateLock) |
| | | { |
| | | deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)), |
| | | "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")"); |
| | | changelogState.setDomainGenerationId(baseDN, -1); |
| | | } |
| | | } |
| | | |
| | | private void deleteFromChangelogStateDB(Entry<byte[], ?> entry, |
| | |
| | | public void addOfflineReplica(DN baseDN, CSN offlineCSN) |
| | | throws ChangelogException |
| | | { |
| | | // just overwrite any older entry as it is assumed a newly received offline |
| | | // CSN is newer than the previous one |
| | | putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN), |
| | | "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")"); |
| | | synchronized (stateLock) |
| | | { |
| | | // just overwrite any older entry as it is assumed a newly received offline |
| | | // CSN is newer than the previous one |
| | | putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN), |
| | | "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")"); |
| | | changelogState.addOfflineReplica(baseDN, offlineCSN); |
| | | } |
| | | } |
| | | |
| | | private void putInChangelogStateDB(Entry<byte[], byte[]> entry, |
| | |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.HashSet; |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | |
| | | } |
| | | |
| | | @Override |
| | | protected Database openDatabase(String databaseName) |
| | | throws ChangelogException, RuntimeException |
| | | protected Database openDatabase(String databaseName) throws ChangelogException, RuntimeException |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | @Override |
| | | protected ChangelogState readOnDiskChangelogState() throws ChangelogException |
| | | { |
| | | return new ChangelogState(); |
| | | } |
| | | } |
| | | |
| | | @BeforeClass |
| | |
| | | entry(baseDN, generationId)); |
| | | if (!replicas.isEmpty()) |
| | | { |
| | | assertThat(state.getDomainToServerIds()).containsExactly( |
| | | entry(baseDN, replicas)); |
| | | assertThat(state.getDomainToServerIds()) |
| | | .containsExactly(entry(baseDN, new HashSet<Integer>(replicas))); |
| | | } |
| | | else |
| | | { |
| | |
| | | } |
| | | if (!offlineReplicas.isEmpty()) |
| | | { |
| | | assertThat(state.getOfflineReplicas()).containsExactly( |
| | | entry(baseDN, offlineReplicas)); |
| | | assertThat(state.getOfflineReplicas().getSnapshot()) |
| | | .containsExactly(entry(baseDN, offlineReplicas)); |
| | | } |
| | | else |
| | | { |