| | |
| | | package org.opends.server.replication.server.changelog.api; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor; |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | |
| | | */ |
| | | void removeDomain(DN baseDN) throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} across all the domains starting after the |
| | | * provided {@link MultiDomainServerState} for each domain. |
| | | * <p> |
| | | * When the cursor is not used anymore, client code MUST call the |
| | | * {@link DBCursor#close()} method to free the resources and locks used by the |
| | | * cursor. |
| | | * |
| | | * @param startAfterState |
| | | * Starting point for each domain cursor. If any {@link ServerState} |
| | | * for a domain is null, then start from the oldest CSN for each |
| | | * replicaDBs |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see #getCursorFrom(DN, ServerState) |
| | | */ |
| | | public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startAfterState) |
| | | throws ChangelogException; |
| | | |
| | | // serverId methods |
| | | |
| | | /** |
| | |
| | | * |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @param startAfterServerState |
| | | * @param startAfterState |
| | | * Starting point for each ReplicaDB cursor. If any CSN for a |
| | | * replicaDB is null, then start from the oldest CSN for this |
| | | * replicaDB |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see #getCursorFrom(DN, int, CSN) |
| | | */ |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, |
| | | ServerState startAfterServerState) throws ChangelogException; |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterState) |
| | | throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} for one replicaDB for the specified |
| | |
| | | throws ChangelogException; |
| | | |
| | | /** |
| | | * Unregisters the provided cursor from this replication domain. |
| | | * |
| | | * @param cursor |
| | | * the cursor to unregister. |
| | | */ |
| | | void unregisterCursor(DBCursor<?> cursor); |
| | | |
| | | /** |
| | | * Publishes the provided change to the changelog DB for the specified |
| | | * serverId and replication domain. After a change has been successfully |
| | | * published, it becomes available to be returned by the External ChangeLog. |
| | |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.*; |
| | | import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer; |
| | | import org.opends.server.replication.server.changelog.je.CompositeDBCursor; |
| | | import org.opends.server.replication.server.changelog.je.DomainDBCursor; |
| | | import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor; |
| | | import org.opends.server.replication.server.changelog.je.ReplicaOfflineCursor; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DebugLogLevel; |
| | |
| | | */ |
| | | public class FileChangelogDB implements ChangelogDB, ReplicationDomainDB |
| | | { |
| | | /** The tracer object for the debug logger. */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | |
| | | * <li>then check it's not null</li> |
| | | * <li>then close all inside</li> |
| | | * </ol> |
| | | * When creating a FileReplicaDB, synchronize on the domainMap to avoid |
| | | * When creating a replicaDB, synchronize on the domainMap to avoid |
| | | * concurrent shutdown. |
| | | */ |
| | | private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> |
| | | domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>(); |
| | | private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs = |
| | | new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>(); |
| | | /** |
| | | * \@GuardedBy("itself") |
| | | */ |
| | | private final Map<DN, List<DomainDBCursor>> registeredDomainCursors = |
| | | new HashMap<DN, List<DomainDBCursor>>(); |
| | | private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = |
| | | new CopyOnWriteArrayList<MultiDomainDBCursor>(); |
| | | private ReplicationEnvironment replicationEnv; |
| | | private final ReplicationServerCfg config; |
| | | private final File dbDirectory; |
| | |
| | | * if a problem occurs opening the supplied directory |
| | | */ |
| | | public FileChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config) |
| | | throws ConfigException |
| | | throws ConfigException |
| | | { |
| | | this.replicationServer = replicationServer; |
| | | this.config = config; |
| | | this.replicationServer = replicationServer; |
| | | this.dbDirectory = makeDir(config.getReplicationDBDirectory()); |
| | | } |
| | | |
| | |
| | | * the serverId for which to create a ReplicaDB |
| | | * @param server |
| | | * the ReplicationServer |
| | | * @return a Pair with the FileReplicaDB and a boolean indicating whether it had |
| | | * to be created |
| | | * @return a Pair with the FileReplicaDB and a boolean indicating whether it has been created |
| | | * @throws ChangelogException |
| | | * if a problem occurred with the database |
| | | */ |
| | |
| | | final Pair<FileReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server); |
| | | if (result != null) |
| | | { |
| | | final Boolean dbWasCreated = result.getSecond(); |
| | | if (dbWasCreated) |
| | | { // new replicaDB => update all cursors with it |
| | | final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors != null && !cursors.isEmpty()) |
| | | { |
| | | for (DomainDBCursor cursor : cursors) |
| | | { |
| | | cursor.addReplicaDB(serverId, null); |
| | | } |
| | | } |
| | | } |
| | | |
| | | return result; |
| | | } |
| | | } |
| | |
| | | // there was already a value associated to the key, let's use it |
| | | return previousValue; |
| | | } |
| | | |
| | | // we just created a new domain => update all cursors |
| | | for (MultiDomainDBCursor cursor : registeredMultiDomainCursors) |
| | | { |
| | | cursor.addDomain(baseDN, null); |
| | | } |
| | | return newValue; |
| | | } |
| | | |
| | | private Pair<FileReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, FileReplicaDB> domainMap, |
| | | final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException |
| | | { |
| | | // happy path: the FileReplicaDB already exists |
| | | // happy path: the replicaDB already exists |
| | | FileReplicaDB currentValue = domainMap.get(serverId); |
| | | if (currentValue != null) |
| | | { |
| | | return Pair.of(currentValue, false); |
| | | } |
| | | |
| | | // unlucky, the FileReplicaDB does not exist: take the hit and synchronize |
| | | // unlucky, the replicaDB does not exist: take the hit and synchronize |
| | | // on the domainMap to create a new ReplicaDB |
| | | synchronized (domainMap) |
| | | { |
| | |
| | | // The domainMap could have been concurrently removed because |
| | | // 1) a shutdown was initiated or 2) an initialize was called. |
| | | // Return will allow the code to: |
| | | // 1) shutdown properly or 2) lazily recreate the FileReplicaDB |
| | | // 1) shutdown properly or 2) lazily recreate the replicaDB |
| | | return null; |
| | | } |
| | | |
| | |
| | | { |
| | | // do nothing: we are already shutting down |
| | | } |
| | | |
| | | replicationEnv.shutdown(); |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Clears all records from the changelog (does not remove the log itself). |
| | | * Clears all records from the changelog (does not remove the changelog itself). |
| | | * |
| | | * @throws ChangelogException |
| | | * If an error occurs when clearing the log. |
| | | * If an error occurs when clearing the changelog. |
| | | */ |
| | | public void clearDB() throws ChangelogException |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState) |
| | | throws ChangelogException |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException |
| | | { |
| | | final Set<Integer> serverIds = getDomainMap(baseDN).keySet(); |
| | | final MultiDomainServerState offlineReplicas = replicationEnv.getChangelogState().getOfflineReplicas(); |
| | | final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size()); |
| | | for (int serverId : serverIds) |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this); |
| | | registeredMultiDomainCursors.add(cursor); |
| | | for (DN baseDN : domainToReplicaDBs.keySet()) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; |
| | | final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); |
| | | replicaDBCursor.next(); |
| | | final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState); |
| | | cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null); |
| | | cursor.addDomain(baseDN, startAfterState.getServerState(baseDN)); |
| | | } |
| | | // recycle exhausted cursors, |
| | | // because client code will not manage the cursors itself |
| | | return new CompositeDBCursor<Void>(cursors, true); |
| | | return cursor; |
| | | } |
| | | |
| | | private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId, |
| | | ServerState startAfterServerState) |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState) |
| | | throws ChangelogException |
| | | { |
| | | final ServerState domainState = offlineReplicas.getServerState(baseDN); |
| | | if (domainState != null) |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN); |
| | | for (int serverId : getDomainMap(baseDN).keySet()) |
| | | { |
| | | for (CSN offlineCSN : domainState) |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null; |
| | | cursor.addReplicaDB(serverId, lastCSN); |
| | | } |
| | | return cursor; |
| | | } |
| | | |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN) |
| | | { |
| | | synchronized (registeredDomainCursors) |
| | | { |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this); |
| | | List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors == null) |
| | | { |
| | | if (serverId == offlineCSN.getServerId() |
| | | && !startAfterServerState.cover(offlineCSN)) |
| | | { |
| | | return offlineCSN; |
| | | } |
| | | cursors = new ArrayList<DomainDBCursor>(); |
| | | registeredDomainCursors.put(baseDN, cursors); |
| | | } |
| | | cursors.add(cursor); |
| | | return cursor; |
| | | } |
| | | } |
| | | |
| | | private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN) |
| | | { |
| | | final MultiDomainServerState offlineReplicas = |
| | | replicationEnv.getChangelogState().getOfflineReplicas(); |
| | | final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId); |
| | | if (offlineCSN != null |
| | | && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN))) |
| | | { |
| | | return offlineCSN; |
| | | } |
| | | return null; |
| | | } |
| | |
| | | final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | return replicaDB.generateCursorFrom(startAfterCSN); |
| | | final DBCursor<UpdateMsg> cursor = |
| | | replicaDB.generateCursorFrom(startAfterCSN); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN); |
| | | // TODO JNR if (offlineCSN != null) ?? |
| | | // What about replicas that suddenly become offline? |
| | | return new ReplicaOfflineCursor(cursor, offlineCSN); |
| | | } |
| | | return EMPTY_CURSOR_REPLICA_DB; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void unregisterCursor(final DBCursor<?> cursor) |
| | | { |
| | | if (cursor instanceof MultiDomainDBCursor) |
| | | { |
| | | registeredMultiDomainCursors.remove(cursor); |
| | | } |
| | | else if (cursor instanceof DomainDBCursor) |
| | | { |
| | | final DomainDBCursor domainCursor = (DomainDBCursor) cursor; |
| | | synchronized (registeredMultiDomainCursors) |
| | | { |
| | | final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN()); |
| | | if (cursors != null) |
| | | { |
| | | cursors.remove(cursor); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException |
| | | { |
| | | final CSN csn = updateMsg.getCSN(); |
| | | final Pair<FileReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN, |
| | | updateMsg.getCSN().getServerId(), replicationServer); |
| | | csn.getServerId(), replicationServer); |
| | | final FileReplicaDB replicaDB = pair.getFirst(); |
| | | final boolean wasCreated = pair.getSecond(); |
| | | |
| | | replicaDB.add(updateMsg); |
| | | |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId()); |
| | | notifyReplicaOnline(indexer, baseDN, csn.getServerId()); |
| | | indexer.publishUpdateMsg(baseDN, updateMsg); |
| | | } |
| | | return wasCreated; |
| | | return pair.getSecond(); // replica DB was created |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.util.Collections; |
| | | import java.util.Comparator; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.concurrent.ConcurrentSkipListSet; |
| | | |
| | | import org.opends.messages.Message; |
| | |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.ReplicaOfflineMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | /** |
| | | * Thread responsible for inserting replicated changes into the ChangeNumber |
| | |
| | | private ChangelogState changelogState; |
| | | |
| | | /* |
| | | * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because |
| | | * The following MultiDomainServerState fields must be thread safe, because |
| | | * 1) initialization can happen while the replication server starts receiving |
| | | * updates 2) many updates can happen concurrently. |
| | | */ |
| | |
| | | * |
| | | * @NonNull |
| | | */ |
| | | @SuppressWarnings("unchecked") |
| | | private CompositeDBCursor<DN> nextChangeForInsertDBCursor = |
| | | new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false); |
| | | |
| | | /** |
| | | * New cursors for this Map must be created from the {@link #run()} method, |
| | | * i.e. from the same thread that will make use of them. If this rule is not |
| | | * obeyed, then a JE exception will be thrown about |
| | | * "Non-transactional Cursors may not be used in multiple threads;". |
| | | */ |
| | | private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors = |
| | | new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>(); |
| | | /** |
| | | * Holds the newCursors that will have to be created in the next iteration |
| | | * inside the {@link #run()} method. |
| | | * <p> |
| | | * This map can be updated by multiple threads. |
| | | */ |
| | | private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors = |
| | | new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>( |
| | | new Comparator<Pair<DN, Integer>>() |
| | | { |
| | | @Override |
| | | public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2) |
| | | { |
| | | final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst()); |
| | | if (compareBaseDN == 0) |
| | | { |
| | | return o1.getSecond().compareTo(o2.getSecond()); |
| | | } |
| | | return compareBaseDN; |
| | | } |
| | | }); |
| | | private MultiDomainDBCursor nextChangeForInsertDBCursor; |
| | | |
| | | /** |
| | | * Builds a ChangeNumberIndexer object. |
| | |
| | | return; |
| | | } |
| | | |
| | | final CSN csn = updateMsg.getCSN(); |
| | | // only keep the oldest CSN that will be the new cursor's starting point |
| | | newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn); |
| | | final CSN oldestCSNBefore = getOldestLastAliveCSN(); |
| | | lastAliveCSNs.update(baseDN, csn); |
| | | lastAliveCSNs.update(baseDN, updateMsg.getCSN()); |
| | | tryNotify(oldestCSNBefore); |
| | | } |
| | | |
| | |
| | | for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | { |
| | | final DN baseDN = entry.getKey(); |
| | | if (!isECLEnabledDomain(baseDN)) |
| | | if (isECLEnabledDomain(baseDN)) |
| | | { |
| | | continue; |
| | | } |
| | | for (Integer serverId : entry.getValue()) |
| | | { |
| | | /* |
| | | * initialize with the oldest possible CSN in order for medium |
| | | * consistency to wait for all replicas to be alive before moving |
| | | * forward |
| | | */ |
| | | lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId)); |
| | | } |
| | | |
| | | for (Integer serverId : entry.getValue()) |
| | | { |
| | | /* |
| | | * initialize with the oldest possible CSN in order for medium |
| | | * consistency to wait for all replicas to be alive before moving |
| | | * forward |
| | | */ |
| | | lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId)); |
| | | // start after the actual CSN when initializing from the previous cookie |
| | | final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId); |
| | | ensureCursorExists(baseDN, serverId, csn); |
| | | ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); |
| | | lastAliveCSNs.update(baseDN, latestKnownState); |
| | | } |
| | | |
| | | ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); |
| | | lastAliveCSNs.update(baseDN, latestKnownState); |
| | | } |
| | | resetNextChangeForInsertDBCursor(); |
| | | |
| | | nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV); |
| | | nextChangeForInsertDBCursor.next(); |
| | | |
| | | if (newestRecord != null) |
| | | { |
| | |
| | | return new CSN(0, 0, serverId); |
| | | } |
| | | |
| | | private void resetNextChangeForInsertDBCursor() throws ChangelogException |
| | | { |
| | | final Map<DBCursor<UpdateMsg>, DN> cursors = |
| | | new HashMap<DBCursor<UpdateMsg>, DN>(); |
| | | for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry |
| | | : this.allCursors.entrySet()) |
| | | { |
| | | for (Entry<Integer, DBCursor<UpdateMsg>> entry2 |
| | | : entry.getValue().entrySet()) |
| | | { |
| | | cursors.put(entry2.getValue(), entry.getKey()); |
| | | } |
| | | } |
| | | |
| | | // CNIndexer manages the cursor itself, |
| | | // so do not try to recycle exhausted cursors |
| | | CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false); |
| | | result.next(); |
| | | nextChangeForInsertDBCursor = result; |
| | | } |
| | | |
| | | private boolean ensureCursorExists(DN baseDN, Integer serverId, |
| | | CSN startAfterCSN) throws ChangelogException |
| | | { |
| | | Map<Integer, DBCursor<UpdateMsg>> map = allCursors.get(baseDN); |
| | | if (map == null) |
| | | { |
| | | map = new ConcurrentSkipListMap<Integer, DBCursor<UpdateMsg>>(); |
| | | allCursors.put(baseDN, map); |
| | | } |
| | | DBCursor<UpdateMsg> cursor = map.get(serverId); |
| | | if (cursor == null) |
| | | { |
| | | final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); |
| | | cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN); |
| | | cursor.next(); |
| | | map.put(serverId, cursor); |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Returns the immediately preceding CSN. |
| | | * |
| | | * @param csn |
| | | * the CSN to use |
| | | * @return the immediately preceding CSN or null if the provided CSN is null. |
| | | */ |
| | | CSN getPrecedingCSN(CSN csn) |
| | | { |
| | | if (csn == null) |
| | | { |
| | | return null; |
| | | } |
| | | if (csn.getSeqnum() > 0) |
| | | { |
| | | return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId()); |
| | | } |
| | | return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId()); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void initiateShutdown() |
| | |
| | | { |
| | | try |
| | | { |
| | | if (!domainsToClear.isEmpty()) |
| | | while (!domainsToClear.isEmpty()) |
| | | { |
| | | while (!domainsToClear.isEmpty()) |
| | | { |
| | | final DN baseDNToClear = domainsToClear.first(); |
| | | removeCursors(baseDNToClear); |
| | | // Only release the waiting thread |
| | | // once this domain's state has been cleared. |
| | | domainsToClear.remove(baseDNToClear); |
| | | } |
| | | resetNextChangeForInsertDBCursor(); |
| | | } |
| | | else |
| | | { |
| | | final boolean createdCursors = createNewCursors(); |
| | | final boolean recycledCursors = recycleExhaustedCursors(); |
| | | if (createdCursors || recycledCursors) |
| | | { |
| | | resetNextChangeForInsertDBCursor(); |
| | | } |
| | | final DN baseDNToClear = domainsToClear.first(); |
| | | nextChangeForInsertDBCursor.removeDomain(baseDNToClear); |
| | | // Only release the waiting thread |
| | | // once this domain's state has been cleared. |
| | | domainsToClear.remove(baseDNToClear); |
| | | } |
| | | |
| | | // Do not call DBCursor.next() here |
| | | // because we might not have consumed the last record, |
| | | // for example if we could not move the MCP forward |
| | | final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord(); |
| | | if (msg == null) |
| | | { |
| | |
| | | } |
| | | wait(); |
| | | } |
| | | // loop to check whether new changes have been added to the |
| | | // ReplicaDBs |
| | | // check whether new changes have been added to the ReplicaDBs |
| | | nextChangeForInsertDBCursor.next(); |
| | | continue; |
| | | } |
| | | else if (msg instanceof ReplicaOfflineMsg) |
| | | { |
| | | nextChangeForInsertDBCursor.next(); |
| | | continue; |
| | | } |
| | | |
| | |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | | // Nothing can be done about it. |
| | | // Rely on the DirectoryThread uncaught exceptions handler |
| | | // for logging error + alert. |
| | | // Message logged here gives corrective information to the administrator. |
| | | Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get( |
| | | getClass().getSimpleName(), stackTraceToSingleLineString(e)); |
| | | TRACER.debugError(msg.toString()); |
| | | logUnexpectedException(e); |
| | | // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert. |
| | | throw e; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // Nothing can be done about it. |
| | | // Rely on the DirectoryThread uncaught exceptions handler |
| | | // for logging error + alert. |
| | | // Message logged here gives corrective information to the administrator. |
| | | Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get( |
| | | getClass().getSimpleName(), stackTraceToSingleLineString(e)); |
| | | TRACER.debugError(msg.toString()); |
| | | logUnexpectedException(e); |
| | | // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert. |
| | | throw new RuntimeException(e); |
| | | } |
| | | finally |
| | | { |
| | | removeCursors(DN.NULL_DN); |
| | | nextChangeForInsertDBCursor.close(); |
| | | nextChangeForInsertDBCursor = null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Nothing can be done about it. |
| | | * <p> |
| | | * Rely on the DirectoryThread uncaught exceptions handler for logging error + |
| | | * alert. |
| | | * <p> |
| | | * Message logged here gives corrective information to the administrator. |
| | | */ |
| | | private void logUnexpectedException(Exception e) |
| | | { |
| | | Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get( |
| | | getClass().getSimpleName(), stackTraceToSingleLineString(e)); |
| | | TRACER.debugError(msg.toString()); |
| | | } |
| | | |
| | | private void moveForwardMediumConsistencyPoint(final CSN mcCSN, |
| | | final DN mcBaseDN) throws ChangelogException |
| | | { |
| | | // update, so it becomes the previous cookie for the next change |
| | | mediumConsistencyRUV.update(mcBaseDN, mcCSN); |
| | | |
| | | boolean callNextOnCursor = true; |
| | | final int mcServerId = mcCSN.getServerId(); |
| | | final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId); |
| | | final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId); |
| | |
| | | } |
| | | else if (offlineCSN.isOlderThan(mcCSN)) |
| | | { |
| | | Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>> |
| | | pair = getCursor(mcBaseDN, mcCSN.getServerId()); |
| | | Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond(); |
| | | if (iter != null && !iter.hasNext()) |
| | | { |
| | | /* |
| | | * replica is not back online, Medium consistency point has gone past |
| | | * its last offline time, and there are no more changes after the |
| | | * offline CSN in the cursor: remove everything known about it: |
| | | * cursor, offlineCSN from lastAliveCSN and remove all knowledge of |
| | | * this replica from the medium consistency RUV. |
| | | */ |
| | | iter.remove(); |
| | | StaticUtils.close(pair.getFirst()); |
| | | resetNextChangeForInsertDBCursor(); |
| | | callNextOnCursor = false; |
| | | lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); |
| | | } |
| | | /* |
| | | * replica is not back online, Medium consistency point has gone past |
| | | * its last offline time, and there are no more changes after the |
| | | * offline CSN in the cursor: remove everything known about it: |
| | | * cursor, offlineCSN from lastAliveCSN and remove all knowledge of |
| | | * this replica from the medium consistency RUV. |
| | | */ |
| | | // TODO JNR how to close cursor for offline replica? |
| | | lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); |
| | | } |
| | | } |
| | | |
| | | if (callNextOnCursor) |
| | | { |
| | | // advance the cursor we just read from, |
| | | // success/failure will be checked later |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | } |
| | | |
| | | private void removeCursors(DN baseDN) |
| | | { |
| | | if (nextChangeForInsertDBCursor != null) |
| | | { |
| | | nextChangeForInsertDBCursor.close(); |
| | | nextChangeForInsertDBCursor = null; |
| | | } |
| | | if (DN.NULL_DN.equals(baseDN)) |
| | | { |
| | | // close all cursors |
| | | for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values()) |
| | | { |
| | | StaticUtils.close(map.values()); |
| | | } |
| | | allCursors.clear(); |
| | | newCursors.clear(); |
| | | } |
| | | else |
| | | { |
| | | // close cursors for this DN |
| | | final Map<Integer, DBCursor<UpdateMsg>> map = allCursors.remove(baseDN); |
| | | if (map != null) |
| | | { |
| | | StaticUtils.close(map.values()); |
| | | } |
| | | for (Iterator<Pair<DN, Integer>> it = newCursors.keySet().iterator(); it.hasNext();) |
| | | { |
| | | if (it.next().getFirst().equals(baseDN)) |
| | | { |
| | | it.remove(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>> |
| | | getCursor(final DN baseDN, final int serverId) throws ChangelogException |
| | | { |
| | | for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1 |
| | | : allCursors.entrySet()) |
| | | { |
| | | if (baseDN.equals(entry1.getKey())) |
| | | { |
| | | for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = |
| | | entry1.getValue().entrySet().iterator(); iter.hasNext();) |
| | | { |
| | | final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next(); |
| | | if (serverId == entry2.getKey()) |
| | | { |
| | | return Pair.of(entry2.getValue(), iter); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | return Pair.empty(); |
| | | } |
| | | |
| | | private boolean recycleExhaustedCursors() throws ChangelogException |
| | | { |
| | | boolean succesfullyRecycled = false; |
| | | for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values()) |
| | | { |
| | | for (DBCursor<UpdateMsg> cursor : map.values()) |
| | | { |
| | | // try to recycle it by calling next() |
| | | if (cursor.getRecord() == null && cursor.next()) |
| | | { |
| | | succesfullyRecycled = true; |
| | | } |
| | | } |
| | | } |
| | | return succesfullyRecycled; |
| | | } |
| | | |
| | | private boolean createNewCursors() throws ChangelogException |
| | | { |
| | | if (!newCursors.isEmpty()) |
| | | { |
| | | boolean newCursorAdded = false; |
| | | for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter = |
| | | newCursors.entrySet().iterator(); iter.hasNext();) |
| | | { |
| | | final Entry<Pair<DN, Integer>, CSN> entry = iter.next(); |
| | | final DN baseDN = entry.getKey().getFirst(); |
| | | final CSN csn = entry.getValue(); |
| | | // start after preceding CSN so the first CSN read will exactly be the |
| | | // current one |
| | | final CSN startFromCSN = getPrecedingCSN(csn); |
| | | if (!ensureCursorExists(baseDN, csn.getServerId(), startFromCSN)) |
| | | { |
| | | newCursorAdded = true; |
| | | } |
| | | iter.remove(); |
| | | } |
| | | return newCursorAdded; |
| | | } |
| | | return false; |
| | | // advance the cursor we just read from, |
| | | // success/failure will be checked later |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @param <Data> |
| | | * The type of data associated with each cursor |
| | | */ |
| | | public final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg> |
| | | abstract class CompositeDBCursor<Data> implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | private static final byte UNINITIALIZED = 0; |
| | |
| | | */ |
| | | private byte state = UNINITIALIZED; |
| | | |
| | | /** Whether this composite should try to recycle exhausted cursors. */ |
| | | private final boolean recycleExhaustedCursors; |
| | | /** |
| | | * These cursors are considered exhausted because they had no new changes the |
| | | * last time {@link DBCursor#next()} was called on them. Exhausted cursors |
| | |
| | | /** |
| | | * The cursors are sorted based on the current change of each cursor to |
| | | * consider the next change across all available cursors. |
| | | * <p> |
| | | * New cursors for this Map must be created from the same thread that will |
| | | * make use of them. When this rule is not obeyed, a JE exception will be |
| | | * thrown about |
| | | * "Non-transactional Cursors may not be used in multiple threads;". |
| | | */ |
| | | private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors = |
| | | private final TreeMap<DBCursor<UpdateMsg>, Data> cursors = |
| | | new TreeMap<DBCursor<UpdateMsg>, Data>( |
| | | new Comparator<DBCursor<UpdateMsg>>() |
| | | { |
| | |
| | | } |
| | | }); |
| | | |
| | | /** |
| | | * Builds a CompositeDBCursor using the provided collection of cursors. |
| | | * |
| | | * @param cursors |
| | | * the cursors that will be iterated upon. |
| | | * @param recycleExhaustedCursors |
| | | * whether a call to {@link #next()} tries to recycle exhausted |
| | | * cursors |
| | | */ |
| | | public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors, |
| | | boolean recycleExhaustedCursors) |
| | | { |
| | | this.recycleExhaustedCursors = recycleExhaustedCursors; |
| | | for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet()) |
| | | { |
| | | put(entry); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | |
| | | { |
| | | return false; |
| | | } |
| | | final boolean advanceNonExhaustedCursors = state != UNINITIALIZED; |
| | | state = READY; |
| | | if (recycleExhaustedCursors && !exhaustedCursors.isEmpty()) |
| | | |
| | | if (state == UNINITIALIZED) |
| | | { |
| | | // try to recycle empty cursors in case the underlying ReplicaDBs received |
| | | // new changes. |
| | | state = READY; |
| | | } |
| | | else |
| | | { |
| | | // Previous state was READY => we must advance the first cursor |
| | | // because the UpdateMsg it is pointing has already been consumed. |
| | | // To keep consistent the cursors' order in the SortedSet, it is necessary |
| | | // to remove the first cursor, then add it again after moving it forward. |
| | | final Entry<DBCursor<UpdateMsg>, Data> cursorToAdvance = cursors.pollFirstEntry(); |
| | | if (cursorToAdvance != null) |
| | | { |
| | | addCursor(cursorToAdvance.getKey(), cursorToAdvance.getValue()); |
| | | } |
| | | } |
| | | |
| | | recycleExhaustedCursors(); |
| | | removeNoLongerNeededCursors(); |
| | | incorporateNewCursors(); |
| | | return !cursors.isEmpty(); |
| | | } |
| | | |
| | | private void recycleExhaustedCursors() throws ChangelogException |
| | | { |
| | | if (!exhaustedCursors.isEmpty()) |
| | | { |
| | | // try to recycle exhausted cursors in case the underlying replica DBs received new changes. |
| | | final Map<DBCursor<UpdateMsg>, Data> copy = |
| | | new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors); |
| | | exhaustedCursors.clear(); |
| | | for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet()) |
| | | { |
| | | entry.getKey().next(); |
| | | put(entry); |
| | | } |
| | | final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry(); |
| | | if (firstEntry != null && copy.containsKey(firstEntry.getKey())) |
| | | { |
| | | // if the first cursor was previously an exhausted cursor, |
| | | // then we have already called next() on it. |
| | | // Avoid calling it again because we know new changes have been found. |
| | | return true; |
| | | addCursor(entry.getKey(), entry.getValue()); |
| | | } |
| | | } |
| | | |
| | | // To keep consistent the cursors' order in the SortedSet, it is necessary |
| | | // to remove and add again the cursor after moving it forward. |
| | | if (advanceNonExhaustedCursors) |
| | | { |
| | | Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry(); |
| | | if (firstEntry != null) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = firstEntry.getKey(); |
| | | cursor.next(); |
| | | put(firstEntry); |
| | | } |
| | | } |
| | | // no cursors are left with changes. |
| | | return !cursors.isEmpty(); |
| | | } |
| | | |
| | | private void put(Entry<DBCursor<UpdateMsg>, Data> entry) |
| | | private void removeNoLongerNeededCursors() |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = entry.getKey(); |
| | | final Data data = entry.getValue(); |
| | | if (cursor.getRecord() != null) |
| | | for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> iterator = |
| | | cursors.entrySet().iterator(); iterator.hasNext();) |
| | | { |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = iterator.next(); |
| | | final Data data = entry.getValue(); |
| | | if (isCursorNoLongerNeededFor(data)) |
| | | { |
| | | entry.getKey().close(); |
| | | iterator.remove(); |
| | | cursorRemoved(data); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Adds a cursor to this composite cursor. It first calls |
| | | * {@link DBCursor#next()} to verify whether it is exhausted or not. |
| | | * |
| | | * @param cursor |
| | | * the cursor to add to this composite |
| | | * @param data |
| | | * the data associated to the provided cursor |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | protected void addCursor(final DBCursor<UpdateMsg> cursor, final Data data) throws ChangelogException |
| | | { |
| | | if (cursor.next()) |
| | | { |
| | | this.cursors.put(cursor, data); |
| | | } |
| | |
| | | @Override |
| | | public UpdateMsg getRecord() |
| | | { |
| | | // Cannot call incorporateNewCursors() here because |
| | | // somebody might have already called DBCursor.getRecord() and read the record |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry(); |
| | | if (entry != null) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Called when implementors should incorporate new cursors into the current |
| | | * composite DBCursor. Implementors should call |
| | | * {@link #addCursor(DBCursor, Object)} to do so. |
| | | * |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | protected abstract void incorporateNewCursors() throws ChangelogException; |
| | | |
| | | /** |
| | | * Returns whether the cursor associated to the provided data should be removed. |
| | | * |
| | | * @param data the data associated to the cursor to be tested |
| | | * @return true if the cursor associated to the provided data should be removed, |
| | | * false otherwise |
| | | */ |
| | | protected abstract boolean isCursorNoLongerNeededFor(Data data); |
| | | |
| | | /** |
| | | * Notifies that the cursor associated to the provided data has been removed. |
| | | * |
| | | * @param data |
| | | * the data associated to the removed cursor |
| | | */ |
| | | protected abstract void cursorRemoved(Data data); |
| | | |
| | | /** |
| | | * Returns the data associated to the cursor that returned the current record. |
| | | * |
| | | * @return the data associated to the cursor that returned the current record. |
| | |
| | | @Override |
| | | public void close() |
| | | { |
| | | state = CLOSED; |
| | | StaticUtils.close(cursors.keySet()); |
| | | StaticUtils.close(exhaustedCursors.keySet()); |
| | | cursors.clear(); |
| | | exhaustedCursors.clear(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.Iterator; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | | * Cursor iterating over a replication domain's replica DBs. |
| | | */ |
| | | public class DomainDBCursor extends CompositeDBCursor<Void> |
| | | { |
| | | |
| | | private final DN baseDN; |
| | | private final ReplicationDomainDB domainDB; |
| | | |
| | | private final ConcurrentSkipListMap<Integer, CSN> newReplicas = |
| | | new ConcurrentSkipListMap<Integer, CSN>(); |
| | | /** |
| | | * Replaces null CSNs in ConcurrentSkipListMap that does not support null values. |
| | | */ |
| | | private static final CSN NULL_CSN = new CSN(0, 0, 0); |
| | | |
| | | /** |
| | | * Builds a DomainDBCursor instance. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain baseDN of this cursor |
| | | * @param domainDB |
| | | * the DB for the provided replication domain |
| | | */ |
| | | public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB) |
| | | { |
| | | this.baseDN = baseDN; |
| | | this.domainDB = domainDB; |
| | | } |
| | | |
| | | /** |
| | | * Returns the replication domain baseDN of this cursor. |
| | | * |
| | | * @return the replication domain baseDN of this cursor. |
| | | */ |
| | | public DN getBaseDN() |
| | | { |
| | | return baseDN; |
| | | } |
| | | |
| | | /** |
| | | * Adds a replicaDB for this cursor to iterate over. Added cursors will be |
| | | * created and iterated over on the next call to {@link #next()}. |
| | | * |
| | | * @param serverId |
| | | * the serverId of the replica |
| | | * @param startAfterCSN |
| | | * the CSN after which to start iterating |
| | | */ |
| | | public void addReplicaDB(int serverId, CSN startAfterCSN) |
| | | { |
| | | // only keep the oldest CSN that will be the new cursor's starting point |
| | | newReplicas.putIfAbsent(serverId, startAfterCSN != null ? startAfterCSN : NULL_CSN); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | protected void incorporateNewCursors() throws ChangelogException |
| | | { |
| | | for (Iterator<Entry<Integer, CSN>> iter = newReplicas.entrySet().iterator(); iter.hasNext();) |
| | | { |
| | | final Entry<Integer, CSN> pair = iter.next(); |
| | | final int serverId = pair.getKey(); |
| | | final CSN csn = pair.getValue(); |
| | | final CSN startAfterCSN = !NULL_CSN.equals(csn) ? csn : null; |
| | | final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN); |
| | | addCursor(cursor, null); |
| | | iter.remove(); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | protected boolean isCursorNoLongerNeededFor(Void data) |
| | | { |
| | | return false; // Not needed |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | protected void cursorRemoved(Void data) |
| | | { |
| | | // Not used so far |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | super.close(); |
| | | domainDB.unregisterCursor(this); |
| | | newReplicas.clear(); |
| | | } |
| | | |
| | | } |
| | |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.admin.std.server.ReplicationServerCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | |
| | | * <li>then check it's not null</li> |
| | | * <li>then close all inside</li> |
| | | * </ol> |
| | | * When creating a JEReplicaDB, synchronize on the domainMap to avoid |
| | | * When creating a replicaDB, synchronize on the domainMap to avoid |
| | | * concurrent shutdown. |
| | | */ |
| | | private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> |
| | | domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>(); |
| | | private ReplicationDbEnv dbEnv; |
| | | private ReplicationServerCfg config; |
| | | private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs = |
| | | new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>(); |
| | | /** |
| | | * \@GuardedBy("itself") |
| | | */ |
| | | private final Map<DN, List<DomainDBCursor>> registeredDomainCursors = |
| | | new HashMap<DN, List<DomainDBCursor>>(); |
| | | private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = |
| | | new CopyOnWriteArrayList<MultiDomainDBCursor>(); |
| | | private ReplicationDbEnv replicationEnv; |
| | | private final ReplicationServerCfg config; |
| | | private final File dbDirectory; |
| | | |
| | | /** |
| | |
| | | |
| | | /** The local replication server. */ |
| | | private final ReplicationServer replicationServer; |
| | | private AtomicBoolean shutdown = new AtomicBoolean(); |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(); |
| | | |
| | | private static final DBCursor<UpdateMsg> EMPTY_CURSOR = |
| | | private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB = |
| | | new DBCursor<UpdateMsg>() |
| | | { |
| | | |
| | |
| | | }; |
| | | |
| | | /** |
| | | * Builds an instance of this class. |
| | | * Creates a new changelog DB. |
| | | * |
| | | * @param replicationServer |
| | | * the local replication server. |
| | |
| | | * @throws ConfigException |
| | | * if a problem occurs opening the supplied directory |
| | | */ |
| | | public JEChangelogDB(ReplicationServer replicationServer, |
| | | ReplicationServerCfg config) throws ConfigException |
| | | public JEChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config) |
| | | throws ConfigException |
| | | { |
| | | this.config = config; |
| | | this.replicationServer = replicationServer; |
| | | this.dbDirectory = makeDir(config.getReplicationDBDirectory()); |
| | | } |
| | | |
| | | private File makeDir(String dbDirName) throws ConfigException |
| | | private File makeDir(final String dbDirName) throws ConfigException |
| | | { |
| | | // Check that this path exists or create it. |
| | | final File dbDirectory = getFileForPath(dbDirName); |
| | |
| | | if (debugEnabled()) |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | |
| | | final MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(e.getLocalizedMessage()); |
| | | mb.append(" "); |
| | | mb.append(String.valueOf(dbDirectory)); |
| | | Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()); |
| | | throw new ConfigException(msg, e); |
| | | final MessageBuilder mb = new MessageBuilder(e.getLocalizedMessage()).append(" ") |
| | | .append(String.valueOf(dbDirectory)); |
| | | throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e); |
| | | } |
| | | } |
| | | |
| | |
| | | * the serverId for which to create a ReplicaDB |
| | | * @param server |
| | | * the ReplicationServer |
| | | * @return a Pair with the JEReplicaDB and a boolean indicating whether it had |
| | | * to be created |
| | | * @return a Pair with the JEReplicaDB and a boolean indicating whether it has been created |
| | | * @throws ChangelogException |
| | | * if a problem occurred with the database |
| | | */ |
| | | Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN, |
| | | int serverId, ReplicationServer server) throws ChangelogException |
| | | Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId, |
| | | final ReplicationServer server) throws ChangelogException |
| | | { |
| | | while (!shutdown.get()) |
| | | { |
| | | final ConcurrentMap<Integer, JEReplicaDB> domainMap = |
| | | getExistingOrNewDomainMap(baseDN); |
| | | final Pair<JEReplicaDB, Boolean> result = |
| | | getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server); |
| | | final ConcurrentMap<Integer, JEReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN); |
| | | final Pair<JEReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server); |
| | | if (result != null) |
| | | { |
| | | final Boolean dbWasCreated = result.getSecond(); |
| | | if (dbWasCreated) |
| | | { // new replicaDB => update all cursors with it |
| | | final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors != null && !cursors.isEmpty()) |
| | | { |
| | | for (DomainDBCursor cursor : cursors) |
| | | { |
| | | cursor.addReplicaDB(serverId, null); |
| | | } |
| | | } |
| | | } |
| | | |
| | | return result; |
| | | } |
| | | } |
| | | throw new ChangelogException( |
| | | ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get()); |
| | | throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get()); |
| | | } |
| | | |
| | | private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap( |
| | | DN baseDN) |
| | | private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(final DN baseDN) |
| | | { |
| | | // happy path: the domainMap already exists |
| | | final ConcurrentMap<Integer, JEReplicaDB> currentValue = |
| | | domainToReplicaDBs.get(baseDN); |
| | | final ConcurrentMap<Integer, JEReplicaDB> currentValue = domainToReplicaDBs.get(baseDN); |
| | | if (currentValue != null) |
| | | { |
| | | return currentValue; |
| | |
| | | // unlucky, the domainMap does not exist: take the hit and create the |
| | | // newValue, even though the same could be done concurrently by another |
| | | // thread |
| | | final ConcurrentMap<Integer, JEReplicaDB> newValue = |
| | | new ConcurrentHashMap<Integer, JEReplicaDB>(); |
| | | final ConcurrentMap<Integer, JEReplicaDB> previousValue = |
| | | domainToReplicaDBs.putIfAbsent(baseDN, newValue); |
| | | final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<Integer, JEReplicaDB>(); |
| | | final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue); |
| | | if (previousValue != null) |
| | | { |
| | | // there was already a value associated to the key, let's use it |
| | | return previousValue; |
| | | } |
| | | |
| | | // we just created a new domain => update all cursors |
| | | for (MultiDomainDBCursor cursor : registeredMultiDomainCursors) |
| | | { |
| | | cursor.addDomain(baseDN, null); |
| | | } |
| | | return newValue; |
| | | } |
| | | |
| | | private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB( |
| | | final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId, |
| | | DN baseDN, ReplicationServer server) throws ChangelogException |
| | | private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, JEReplicaDB> domainMap, |
| | | final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException |
| | | { |
| | | // happy path: the JEReplicaDB already exists |
| | | // happy path: the replicaDB already exists |
| | | JEReplicaDB currentValue = domainMap.get(serverId); |
| | | if (currentValue != null) |
| | | { |
| | | return Pair.of(currentValue, false); |
| | | } |
| | | |
| | | // unlucky, the JEReplicaDB does not exist: take the hit and synchronize |
| | | // unlucky, the replicaDB does not exist: take the hit and synchronize |
| | | // on the domainMap to create a new ReplicaDB |
| | | synchronized (domainMap) |
| | | { |
| | |
| | | // The domainMap could have been concurrently removed because |
| | | // 1) a shutdown was initiated or 2) an initialize was called. |
| | | // Return will allow the code to: |
| | | // 1) shutdown properly or 2) lazily recreate the JEReplicaDB |
| | | // 1) shutdown properly or 2) lazily recreate the replicaDB |
| | | return null; |
| | | } |
| | | |
| | | final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, dbEnv); |
| | | final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, replicationEnv); |
| | | domainMap.put(serverId, newDB); |
| | | return Pair.of(newDB, true); |
| | | } |
| | |
| | | try |
| | | { |
| | | final File dbDir = getFileForPath(config.getReplicationDBDirectory()); |
| | | dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer); |
| | | final ChangelogState changelogState = dbEnv.getChangelogState(); |
| | | replicationEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer); |
| | | final ChangelogState changelogState = replicationEnv.getChangelogState(); |
| | | initializeToChangelogState(changelogState); |
| | | if (config.isComputeChangeNumber()) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private void shutdownCNIndexDB() throws ChangelogException |
| | | private void shutdownChangeNumberIndexDB() throws ChangelogException |
| | | { |
| | | synchronized (cnIndexDBLock) |
| | | { |
| | |
| | | |
| | | try |
| | | { |
| | | shutdownCNIndexDB(); |
| | | shutdownChangeNumberIndexDB(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | if (dbEnv != null) |
| | | if (replicationEnv != null) |
| | | { |
| | | // wait for shutdown of the threads holding cursors |
| | | try |
| | |
| | | // do nothing: we are already shutting down |
| | | } |
| | | |
| | | dbEnv.shutdown(); |
| | | replicationEnv.shutdown(); |
| | | } |
| | | |
| | | if (firstException != null) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Clears all content from the changelog database, but leaves its directory on |
| | | * the filesystem. |
| | | * Clears all records from the changelog (does not remove the changelog itself). |
| | | * |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * If an error occurs when clearing the changelog. |
| | | */ |
| | | public void clearDB() throws ChangelogException |
| | | { |
| | |
| | | |
| | | try |
| | | { |
| | | shutdownCNIndexDB(); |
| | | shutdownChangeNumberIndexDB(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | // 3- clear the changelogstate DB |
| | | try |
| | | { |
| | | dbEnv.clearGenerationId(baseDN); |
| | | replicationEnv.clearGenerationId(baseDN); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | { |
| | | if (computeChangeNumber) |
| | | { |
| | | startIndexer(dbEnv.getChangelogState()); |
| | | startIndexer(replicationEnv.getChangelogState()); |
| | | } |
| | | else |
| | | { |
| | |
| | | { |
| | | try |
| | | { |
| | | cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv); |
| | | cnIndexDB = new JEChangeNumberIndexDB(this.replicationEnv); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState) |
| | | throws ChangelogException |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException |
| | | { |
| | | final Set<Integer> serverIds = getDomainMap(baseDN).keySet(); |
| | | final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas(); |
| | | final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size()); |
| | | for (int serverId : serverIds) |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this); |
| | | registeredMultiDomainCursors.add(cursor); |
| | | for (DN baseDN : domainToReplicaDBs.keySet()) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; |
| | | final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); |
| | | replicaDBCursor.next(); |
| | | final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState); |
| | | cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null); |
| | | cursor.addDomain(baseDN, startAfterState.getServerState(baseDN)); |
| | | } |
| | | // recycle exhausted cursors, |
| | | // because client code will not manage the cursors itself |
| | | return new CompositeDBCursor<Void>(cursors, true); |
| | | return cursor; |
| | | } |
| | | |
| | | private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId, |
| | | ServerState startAfterServerState) |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState) |
| | | throws ChangelogException |
| | | { |
| | | final ServerState domainState = offlineReplicas.getServerState(baseDN); |
| | | if (domainState != null) |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN); |
| | | for (int serverId : getDomainMap(baseDN).keySet()) |
| | | { |
| | | for (CSN offlineCSN : domainState) |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null; |
| | | cursor.addReplicaDB(serverId, lastCSN); |
| | | } |
| | | return cursor; |
| | | } |
| | | |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN) |
| | | { |
| | | synchronized (registeredDomainCursors) |
| | | { |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this); |
| | | List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors == null) |
| | | { |
| | | if (serverId == offlineCSN.getServerId() |
| | | && !startAfterServerState.cover(offlineCSN)) |
| | | { |
| | | return offlineCSN; |
| | | } |
| | | cursors = new ArrayList<DomainDBCursor>(); |
| | | registeredDomainCursors.put(baseDN, cursors); |
| | | } |
| | | cursors.add(cursor); |
| | | return cursor; |
| | | } |
| | | } |
| | | |
| | | private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN) |
| | | { |
| | | final MultiDomainServerState offlineReplicas = |
| | | replicationEnv.getChangelogState().getOfflineReplicas(); |
| | | final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId); |
| | | if (offlineCSN != null |
| | | && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN))) |
| | | { |
| | | return offlineCSN; |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN) |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startAfterCSN) |
| | | throws ChangelogException |
| | | { |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | return replicaDB.generateCursorFrom(startAfterCSN); |
| | | final DBCursor<UpdateMsg> cursor = |
| | | replicaDB.generateCursorFrom(startAfterCSN); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN); |
| | | // TODO JNR if (offlineCSN != null) ?? |
| | | // What about replicas that suddenly become offline? |
| | | return new ReplicaOfflineCursor(cursor, offlineCSN); |
| | | } |
| | | return EMPTY_CURSOR; |
| | | return EMPTY_CURSOR_REPLICA_DB; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg) |
| | | throws ChangelogException |
| | | public void unregisterCursor(final DBCursor<?> cursor) |
| | | { |
| | | final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN, |
| | | updateMsg.getCSN().getServerId(), replicationServer); |
| | | final JEReplicaDB replicaDB = pair.getFirst(); |
| | | final boolean wasCreated = pair.getSecond(); |
| | | if (cursor instanceof MultiDomainDBCursor) |
| | | { |
| | | registeredMultiDomainCursors.remove(cursor); |
| | | } |
| | | else if (cursor instanceof DomainDBCursor) |
| | | { |
| | | final DomainDBCursor domainCursor = (DomainDBCursor) cursor; |
| | | synchronized (registeredMultiDomainCursors) |
| | | { |
| | | final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN()); |
| | | if (cursors != null) |
| | | { |
| | | cursors.remove(cursor); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException |
| | | { |
| | | final CSN csn = updateMsg.getCSN(); |
| | | final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN, |
| | | csn.getServerId(), replicationServer); |
| | | final JEReplicaDB replicaDB = pair.getFirst(); |
| | | replicaDB.add(updateMsg); |
| | | |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId()); |
| | | notifyReplicaOnline(indexer, baseDN, csn.getServerId()); |
| | | indexer.publishUpdateMsg(baseDN, updateMsg); |
| | | } |
| | | return wasCreated; |
| | | return pair.getSecond(); // replica DB was created |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | { |
| | | if (indexer.isReplicaOffline(baseDN, serverId)) |
| | | { |
| | | dbEnv.notifyReplicaOnline(baseDN, serverId); |
| | | replicationEnv.notifyReplicaOnline(baseDN, serverId); |
| | | } |
| | | } |
| | | |
| | |
| | | @Override |
| | | public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException |
| | | { |
| | | dbEnv.notifyReplicaOffline(baseDN, offlineCSN); |
| | | replicationEnv.notifyReplicaOffline(baseDN, offlineCSN); |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.Iterator; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.concurrent.ConcurrentSkipListSet; |
| | | |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | | * Cursor iterating over a all the replication domain known to the changelog DB. |
| | | */ |
| | | public class MultiDomainDBCursor extends CompositeDBCursor<DN> |
| | | { |
| | | private final ReplicationDomainDB domainDB; |
| | | |
| | | private final ConcurrentSkipListMap<DN, ServerState> newDomains = |
| | | new ConcurrentSkipListMap<DN, ServerState>(); |
| | | private final ConcurrentSkipListSet<DN> removeDomains = |
| | | new ConcurrentSkipListSet<DN>(); |
| | | |
| | | /** |
| | | * Builds a MultiDomainDBCursor instance. |
| | | * |
| | | * @param domainDB |
| | | * the replication domain management DB |
| | | */ |
| | | public MultiDomainDBCursor(ReplicationDomainDB domainDB) |
| | | { |
| | | this.domainDB = domainDB; |
| | | } |
| | | |
| | | /** |
| | | * Adds a replication domain for this cursor to iterate over. Added cursors |
| | | * will be created and iterated over on the next call to {@link #next()}. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain's baseDN |
| | | * @param startAfterState |
| | | * the {@link ServerState} after which to start iterating |
| | | */ |
| | | public void addDomain(DN baseDN, ServerState startAfterState) |
| | | { |
| | | newDomains.put(baseDN, |
| | | startAfterState != null ? startAfterState : new ServerState()); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | protected void incorporateNewCursors() throws ChangelogException |
| | | { |
| | | for (Iterator<Entry<DN, ServerState>> iter = newDomains.entrySet().iterator(); |
| | | iter.hasNext();) |
| | | { |
| | | final Entry<DN, ServerState> entry = iter.next(); |
| | | final DN baseDN = entry.getKey(); |
| | | final ServerState serverState = entry.getValue(); |
| | | final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState); |
| | | addCursor(domainDBCursor, baseDN); |
| | | iter.remove(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Removes a replication domain from this cursor and stops iterating over it. |
| | | * Removed cursors will be effectively removed on the next call to |
| | | * {@link #next()}. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain's baseDN |
| | | */ |
| | | public void removeDomain(DN baseDN) |
| | | { |
| | | removeDomains.add(baseDN); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | protected boolean isCursorNoLongerNeededFor(DN baseDN) |
| | | { |
| | | return removeDomains.contains(baseDN); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | protected void cursorRemoved(DN baseDN) |
| | | { |
| | | removeDomains.remove(baseDN); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | super.close(); |
| | | domainDB.unregisterCursor(this); |
| | | newDomains.clear(); |
| | | removeDomains.clear(); |
| | | } |
| | | |
| | | } |
| | |
| | | private ChangeNumberIndexDB cnIndexDB; |
| | | @Mock |
| | | private ReplicationDomainDB domainDB; |
| | | private Map<Pair<DN, Integer>, SequentialDBCursor> cursors; |
| | | |
| | | private List<DN> eclEnabledDomains; |
| | | private MultiDomainDBCursor multiDomainCursor; |
| | | private Map<Pair<DN, Integer>, SequentialDBCursor> replicaDBCursors; |
| | | private Map<DN, DomainDBCursor> domainDBCursors; |
| | | private ChangelogState initialState; |
| | | private Map<DN, ServerState> domainNewestCSNs; |
| | | private ChangeNumberIndexer cnIndexer; |
| | |
| | | public void setup() throws Exception |
| | | { |
| | | MockitoAnnotations.initMocks(this); |
| | | when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB); |
| | | when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB); |
| | | |
| | | multiDomainCursor = new MultiDomainDBCursor(domainDB); |
| | | initialState = new ChangelogState(); |
| | | initialCookie = new MultiDomainServerState(); |
| | | cursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>(); |
| | | replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>(); |
| | | domainDBCursors = new HashMap<DN, DomainDBCursor>(); |
| | | domainNewestCSNs = new HashMap<DN, ServerState>(); |
| | | |
| | | when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB); |
| | | when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB); |
| | | when(domainDB.getCursorFrom(any(MultiDomainServerState.class))).thenReturn( |
| | | multiDomainCursor); |
| | | } |
| | | |
| | | @AfterMethod |
| | |
| | | @Test |
| | | public void emptyDBNoDS() throws Exception |
| | | { |
| | | startCNIndexer(BASE_DN1); |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBOneDS() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void nonEmptyDBOneDS() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | setCNIndexDBInitialRecords(msg1); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSs() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | // simulate messages received out of order |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsDifferentDomains() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1, BASE_DN2); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN2, serverId2); |
| | | startCNIndexer(BASE_DN1, BASE_DN2); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void nonEmptyDBTwoDSs() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | setCNIndexDBInitialRecords(msg1, msg2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(ADMIN_DATA_DN, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | addReplica(BASE_DN1, serverId3); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | // cn=admin data will does not participate in the external changelog |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBOneInitialDSAnotherDSJoining() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneGoingOffline() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneInitiallyOffline() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1)); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | publishUpdateMsg(msg1); |
| | | initialState.addOfflineReplica(BASE_DN1, new CSN(2, 1, serverId1)); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | |
| | | // blocked until we receive info for serverId2 |
| | | assertExternalChangelogContent(); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1)); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2); |
| | | final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3); |
| | | publishUpdateMsg(msg2, msg3); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | // MCP moves forward because serverId1 is not really offline |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneKilled() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | |
| | | private void addReplica(DN baseDN, int serverId) throws Exception |
| | | { |
| | | final SequentialDBCursor cursor = new SequentialDBCursor(); |
| | | cursors.put(Pair.of(baseDN, serverId), cursor); |
| | | when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class))) |
| | | .thenReturn(cursor); |
| | | final SequentialDBCursor replicaDBCursor = new SequentialDBCursor(); |
| | | replicaDBCursors.put(Pair.of(baseDN, serverId), replicaDBCursor); |
| | | |
| | | if (isECLEnabledDomain2(baseDN)) |
| | | { |
| | | DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN); |
| | | if (domainDBCursor == null) |
| | | { |
| | | domainDBCursor = new DomainDBCursor(baseDN, domainDB); |
| | | domainDBCursors.put(baseDN, domainDBCursor); |
| | | |
| | | multiDomainCursor.addDomain(baseDN, null); |
| | | when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class))) |
| | | .thenReturn(domainDBCursor); |
| | | } |
| | | domainDBCursor.addReplicaDB(serverId, null); |
| | | when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class))) |
| | | .thenReturn(replicaDBCursor); |
| | | } |
| | | |
| | | when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn( |
| | | getDomainNewestCSNs(baseDN)); |
| | | initialState.addServerIdToDomain(serverId, baseDN); |
| | |
| | | return serverState; |
| | | } |
| | | |
| | | private void startCNIndexer(DN... eclEnabledDomains) |
| | | private void startCNIndexer() |
| | | { |
| | | final List<DN> eclEnabledDomainList = Arrays.asList(eclEnabledDomains); |
| | | cnIndexer = new ChangeNumberIndexer(changelogDB, initialState) |
| | | { |
| | | @Override |
| | | protected boolean isECLEnabledDomain(DN baseDN) |
| | | { |
| | | return eclEnabledDomainList.contains(baseDN); |
| | | return isECLEnabledDomain2(baseDN); |
| | | } |
| | | |
| | | }; |
| | | cnIndexer.start(); |
| | | waitForWaitingState(cnIndexer); |
| | | } |
| | | |
| | | private boolean isECLEnabledDomain2(DN baseDN) |
| | | { |
| | | return eclEnabledDomains.contains(baseDN); |
| | | } |
| | | |
| | | private void stopCNIndexer() throws Exception |
| | | { |
| | | if (cnIndexer != null) |
| | |
| | | final CSN csn = newestMsg.getCSN(); |
| | | when(cnIndexDB.getNewestRecord()).thenReturn( |
| | | new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn)); |
| | | final SequentialDBCursor cursor = cursors.get(Pair.of(baseDN, csn.getServerId())); |
| | | final SequentialDBCursor cursor = |
| | | replicaDBCursors.get(Pair.of(baseDN, csn.getServerId())); |
| | | cursor.add(newestMsg); |
| | | } |
| | | initialCookie.update(msg.getBaseDN(), msg.getCSN()); |
| | |
| | | for (ReplicatedUpdateMsg msg : msgs) |
| | | { |
| | | final SequentialDBCursor cursor = |
| | | cursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId())); |
| | | replicaDBCursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId())); |
| | | if (msg.isEmptyCursor()) |
| | | { |
| | | cursor.add(null); |
| | |
| | | }; |
| | | } |
| | | |
| | | @Test(dataProvider = "precedingCSNDataProvider") |
| | | public void getPrecedingCSN(CSN start, CSN expected) |
| | | { |
| | | cnIndexer = new ChangeNumberIndexer(changelogDB, initialState); |
| | | CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start); |
| | | assertThat(precedingCSN).isEqualTo(expected); |
| | | } |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | |
| | | public class CompositeDBCursorTest extends DirectoryServerTestCase |
| | | { |
| | | |
| | | private final class ConcreteCompositeDBCursor extends CompositeDBCursor<String> |
| | | { |
| | | @Override |
| | | protected void incorporateNewCursors() throws ChangelogException |
| | | { |
| | | } |
| | | |
| | | @Override |
| | | protected boolean isCursorNoLongerNeededFor(String data) |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | @Override |
| | | protected void cursorRemoved(String data) |
| | | { |
| | | } |
| | | } |
| | | |
| | | private UpdateMsg msg1; |
| | | private UpdateMsg msg2; |
| | | private UpdateMsg msg3; |
| | |
| | | of(msg4, baseDN1)); |
| | | } |
| | | |
| | | // TODO : this test fails because msg2 is returned twice |
| | | @Test(enabled=false) |
| | | public void recycleTwoElementsCursorsLongerExhaustion() throws Exception |
| | | { |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |
| | |
| | | private CompositeDBCursor<String> newCompositeDBCursor( |
| | | Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception |
| | | { |
| | | final Map<DBCursor<UpdateMsg>, String> cursorsMap = |
| | | new HashMap<DBCursor<UpdateMsg>, String>(); |
| | | final CompositeDBCursor<String> cursor = new ConcreteCompositeDBCursor(); |
| | | for (Pair<? extends DBCursor<UpdateMsg>, String> pair : pairs) |
| | | { |
| | | // The cursors in the composite are expected to be pointing |
| | | // to first record available |
| | | pair.getFirst().next(); |
| | | cursorsMap.put(pair.getFirst(), pair.getSecond()); |
| | | cursor.addCursor(pair.getFirst(), pair.getSecond()); |
| | | } |
| | | return new CompositeDBCursor<String>(cursorsMap, true); |
| | | return cursor; |
| | | } |
| | | |
| | | private void assertInOrder(final CompositeDBCursor<String> compCursor, |