| | |
| | | package org.opends.server.replication.server.changelog.api; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor; |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | |
| | | */ |
| | | void removeDomain(DN baseDN) throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} across all the domains starting after the |
| | | * provided {@link MultiDomainServerState} for each domain. |
| | | * <p> |
| | | * When the cursor is not used anymore, client code MUST call the |
| | | * {@link DBCursor#close()} method to free the resources and locks used by the |
| | | * cursor. |
| | | * |
| | | * @param startAfterState |
| | | * Starting point for each domain cursor. If any {@link ServerState} |
| | | * for a domain is null, then start from the oldest CSN for each |
| | | * replicaDBs |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see #getCursorFrom(DN, ServerState) |
| | | */ |
| | | public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startAfterState) |
| | | throws ChangelogException; |
| | | |
| | | // serverId methods |
| | | |
| | | /** |
| | |
| | | * |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @param startAfterServerState |
| | | * @param startAfterState |
| | | * Starting point for each ReplicaDB cursor. If any CSN for a |
| | | * replicaDB is null, then start from the oldest CSN for this |
| | | * replicaDB |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see #getCursorFrom(DN, int, CSN) |
| | | */ |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, |
| | | ServerState startAfterServerState) throws ChangelogException; |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterState) |
| | | throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} for one replicaDB for the specified |
| | |
| | | throws ChangelogException; |
| | | |
| | | /** |
| | | * Unregisters the provided cursor from this replication domain. |
| | | * |
| | | * @param cursor |
| | | * the cursor to unregister. |
| | | */ |
| | | void unregisterCursor(DBCursor<?> cursor); |
| | | |
| | | /** |
| | | * Publishes the provided change to the changelog DB for the specified |
| | | * serverId and replication domain. After a change has been successfully |
| | | * published, it becomes available to be returned by the External ChangeLog. |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.Collections; |
| | | import java.util.Comparator; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.concurrent.ConcurrentSkipListSet; |
| | | |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.ReplicaOfflineMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | |
| | | private ChangelogState changelogState; |
| | | |
| | | /* |
| | | * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because |
| | | * The following MultiDomainServerState fields must be thread safe, because |
| | | * 1) initialization can happen while the replication server starts receiving |
| | | * updates 2) many updates can happen concurrently. |
| | | */ |
| | |
| | | * |
| | | * @NonNull |
| | | */ |
| | | @SuppressWarnings("unchecked") |
| | | private CompositeDBCursor<DN> nextChangeForInsertDBCursor = |
| | | new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false); |
| | | |
| | | /** |
| | | * New cursors for this Map must be created from the {@link #run()} method, |
| | | * i.e. from the same thread that will make use of them. If this rule is not |
| | | * obeyed, then a JE exception will be thrown about |
| | | * "Non-transactional Cursors may not be used in multiple threads;". |
| | | */ |
| | | private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors = |
| | | new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>(); |
| | | /** |
| | | * Holds the newCursors that will have to be created in the next iteration |
| | | * inside the {@link #run()} method. |
| | | * <p> |
| | | * This map can be updated by multiple threads. |
| | | */ |
| | | private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors = |
| | | new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>( |
| | | new Comparator<Pair<DN, Integer>>() |
| | | { |
| | | @Override |
| | | public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2) |
| | | { |
| | | final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst()); |
| | | if (compareBaseDN == 0) |
| | | { |
| | | return o1.getSecond().compareTo(o2.getSecond()); |
| | | } |
| | | return compareBaseDN; |
| | | } |
| | | }); |
| | | private MultiDomainDBCursor nextChangeForInsertDBCursor; |
| | | |
| | | /** |
| | | * Builds a ChangeNumberIndexer object. |
| | |
| | | return; |
| | | } |
| | | |
| | | final CSN csn = updateMsg.getCSN(); |
| | | // only keep the oldest CSN that will be the new cursor's starting point |
| | | newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn); |
| | | final CSN oldestCSNBefore = getOldestLastAliveCSN(); |
| | | lastAliveCSNs.update(baseDN, csn); |
| | | lastAliveCSNs.update(baseDN, updateMsg.getCSN()); |
| | | tryNotify(oldestCSNBefore); |
| | | } |
| | | |
| | |
| | | for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | { |
| | | final DN baseDN = entry.getKey(); |
| | | if (!isECLEnabledDomain(baseDN)) |
| | | if (isECLEnabledDomain(baseDN)) |
| | | { |
| | | continue; |
| | | } |
| | | for (Integer serverId : entry.getValue()) |
| | | { |
| | | /* |
| | | * initialize with the oldest possible CSN in order for medium |
| | | * consistency to wait for all replicas to be alive before moving forward |
| | | */ |
| | | lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId)); |
| | | } |
| | | |
| | | for (Integer serverId : entry.getValue()) |
| | | { |
| | | /* |
| | | * initialize with the oldest possible CSN in order for medium |
| | | * consistency to wait for all replicas to be alive before moving |
| | | * forward |
| | | */ |
| | | lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId)); |
| | | // start after the actual CSN when initializing from the previous cookie |
| | | final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId); |
| | | ensureCursorExists(baseDN, serverId, csn); |
| | | final ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); |
| | | lastAliveCSNs.update(baseDN, latestKnownState); |
| | | } |
| | | |
| | | ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); |
| | | lastAliveCSNs.update(baseDN, latestKnownState); |
| | | } |
| | | resetNextChangeForInsertDBCursor(); |
| | | |
| | | nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV); |
| | | nextChangeForInsertDBCursor.next(); |
| | | |
| | | if (newestRecord != null) |
| | | { |
| | | // restore the "previousCookie" state before shutdown |
| | | final UpdateMsg record = nextChangeForInsertDBCursor.getRecord(); |
| | | UpdateMsg record = nextChangeForInsertDBCursor.getRecord(); |
| | | if (record instanceof ReplicaOfflineMsg) |
| | | { |
| | | // ignore: replica offline messages are never stored in the CNIndexDB |
| | | nextChangeForInsertDBCursor.next(); |
| | | record = nextChangeForInsertDBCursor.getRecord(); |
| | | } |
| | | |
| | | // sanity check: ensure that when initializing the cursors at the previous |
| | | // cookie, the next change we find is the newest record in the CNIndexDB |
| | | if (!record.getCSN().equals(newestRecord.getCSN())) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get(newestRecord |
| | | .getCSN().toStringUI(), record.getCSN().toStringUI())); |
| | | throw new ChangelogException(ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get( |
| | | newestRecord.getCSN().toStringUI(), record.getCSN().toStringUI())); |
| | | } |
| | | // Now we can update the mediumConsistencyRUV |
| | | mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN()); |
| | |
| | | return new CSN(0, 0, serverId); |
| | | } |
| | | |
| | | private void resetNextChangeForInsertDBCursor() throws ChangelogException |
| | | { |
| | | final Map<DBCursor<UpdateMsg>, DN> cursors = |
| | | new HashMap<DBCursor<UpdateMsg>, DN>(); |
| | | for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry |
| | | : this.allCursors.entrySet()) |
| | | { |
| | | for (Entry<Integer, DBCursor<UpdateMsg>> entry2 |
| | | : entry.getValue().entrySet()) |
| | | { |
| | | cursors.put(entry2.getValue(), entry.getKey()); |
| | | } |
| | | } |
| | | |
| | | // CNIndexer manages the cursor itself, |
| | | // so do not try to recycle exhausted cursors |
| | | CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false); |
| | | result.next(); |
| | | nextChangeForInsertDBCursor = result; |
| | | } |
| | | |
| | | private boolean ensureCursorExists(DN baseDN, Integer serverId, |
| | | CSN startAfterCSN) throws ChangelogException |
| | | { |
| | | Map<Integer, DBCursor<UpdateMsg>> map = allCursors.get(baseDN); |
| | | if (map == null) |
| | | { |
| | | map = new ConcurrentSkipListMap<Integer, DBCursor<UpdateMsg>>(); |
| | | allCursors.put(baseDN, map); |
| | | } |
| | | DBCursor<UpdateMsg> cursor = map.get(serverId); |
| | | if (cursor == null) |
| | | { |
| | | final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); |
| | | cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN); |
| | | cursor.next(); |
| | | map.put(serverId, cursor); |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Returns the immediately preceding CSN. |
| | | * |
| | | * @param csn |
| | | * the CSN to use |
| | | * @return the immediately preceding CSN or null if the provided CSN is null. |
| | | */ |
| | | CSN getPrecedingCSN(CSN csn) |
| | | { |
| | | if (csn == null) |
| | | { |
| | | return null; |
| | | } |
| | | if (csn.getSeqnum() > 0) |
| | | { |
| | | return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId()); |
| | | } |
| | | return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId()); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void initiateShutdown() |
| | |
| | | { |
| | | /* |
| | | * initialize here to allow fast application start up and avoid errors due |
| | | * cursors being created in a different thread to the one where they are |
| | | * used. |
| | | * cursors being created in a different thread to the one where they are used. |
| | | */ |
| | | initialize(); |
| | | |
| | |
| | | { |
| | | if (!domainsToClear.isEmpty()) |
| | | { |
| | | final DN cursorData = nextChangeForInsertDBCursor.getData(); |
| | | final boolean callNextOnCursor = |
| | | cursorData == null || domainsToClear.contains(cursorData); |
| | | while (!domainsToClear.isEmpty()) |
| | | { |
| | | final DN baseDNToClear = domainsToClear.first(); |
| | | removeCursors(baseDNToClear); |
| | | nextChangeForInsertDBCursor.removeDomain(baseDNToClear); |
| | | // Only release the waiting thread |
| | | // once this domain's state has been cleared. |
| | | domainsToClear.remove(baseDNToClear); |
| | | } |
| | | resetNextChangeForInsertDBCursor(); |
| | | } |
| | | else |
| | | { |
| | | final boolean createdCursors = createNewCursors(); |
| | | final boolean recycledCursors = recycleExhaustedCursors(); |
| | | if (createdCursors || recycledCursors) |
| | | |
| | | if (callNextOnCursor) |
| | | { |
| | | resetNextChangeForInsertDBCursor(); |
| | | // The next change to consume comes from a domain to be removed. |
| | | // Call DBCursor.next() to ensure this domain is removed |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | } |
| | | |
| | | // Do not call DBCursor.next() here |
| | | // because we might not have consumed the last record, |
| | | // for example if we could not move the MCP forward |
| | | final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord(); |
| | | if (msg == null) |
| | | { |
| | |
| | | } |
| | | wait(); |
| | | } |
| | | // loop to check whether new changes have been added to the |
| | | // ReplicaDBs |
| | | // check whether new changes have been added to the ReplicaDBs |
| | | nextChangeForInsertDBCursor.next(); |
| | | continue; |
| | | } |
| | | else if (msg instanceof ReplicaOfflineMsg) |
| | | { |
| | | nextChangeForInsertDBCursor.next(); |
| | | continue; |
| | | } |
| | | |
| | |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | | // Nothing can be done about it. |
| | | // Rely on the DirectoryThread uncaught exceptions handler |
| | | // for logging error + alert. |
| | | // LocalizableMessage logged here gives corrective information to the administrator. |
| | | logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION, |
| | | getClass().getSimpleName(), stackTraceToSingleLineString(e)); |
| | | logUnexpectedException(e); |
| | | // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert. |
| | | throw e; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // Nothing can be done about it. |
| | | // Rely on the DirectoryThread uncaught exceptions handler |
| | | // for logging error + alert. |
| | | // LocalizableMessage logged here gives corrective information to the administrator. |
| | | logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION, |
| | | getClass().getSimpleName(), stackTraceToSingleLineString(e)); |
| | | logUnexpectedException(e); |
| | | // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert. |
| | | throw new RuntimeException(e); |
| | | } |
| | | finally |
| | | { |
| | | removeCursors(DN.NULL_DN); |
| | | nextChangeForInsertDBCursor.close(); |
| | | nextChangeForInsertDBCursor = null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Nothing can be done about it. |
| | | * <p> |
| | | * Rely on the DirectoryThread uncaught exceptions handler for logging error + |
| | | * alert. |
| | | * <p> |
| | | * Message logged here gives corrective information to the administrator. |
| | | */ |
| | | private void logUnexpectedException(Exception e) |
| | | { |
| | | logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION, |
| | | getClass().getSimpleName(), stackTraceToSingleLineString(e)); |
| | | } |
| | | |
| | | private void moveForwardMediumConsistencyPoint(final CSN mcCSN, |
| | | final DN mcBaseDN) throws ChangelogException |
| | | { |
| | | // update, so it becomes the previous cookie for the next change |
| | | mediumConsistencyRUV.update(mcBaseDN, mcCSN); |
| | | |
| | | boolean callNextOnCursor = true; |
| | | final int mcServerId = mcCSN.getServerId(); |
| | | final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId); |
| | | final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId); |
| | |
| | | } |
| | | else if (offlineCSN.isOlderThan(mcCSN)) |
| | | { |
| | | Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>> |
| | | pair = getCursor(mcBaseDN, mcCSN.getServerId()); |
| | | Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond(); |
| | | if (iter != null && !iter.hasNext()) |
| | | { |
| | | /* |
| | | * replica is not back online, Medium consistency point has gone past |
| | | * its last offline time, and there are no more changes after the |
| | | * offline CSN in the cursor: remove everything known about it: |
| | | * cursor, offlineCSN from lastAliveCSN and remove all knowledge of |
| | | * this replica from the medium consistency RUV. |
| | | */ |
| | | iter.remove(); |
| | | StaticUtils.close(pair.getFirst()); |
| | | resetNextChangeForInsertDBCursor(); |
| | | callNextOnCursor = false; |
| | | lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); |
| | | } |
| | | /* |
| | | * replica is not back online, Medium consistency point has gone past |
| | | * its last offline time, and there are no more changes after the |
| | | * offline CSN in the cursor: remove everything known about it: |
| | | * cursor, offlineCSN from lastAliveCSN and remove all knowledge of |
| | | * this replica from the medium consistency RUV. |
| | | */ |
| | | // TODO JNR how to close cursor for offline replica? |
| | | lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); |
| | | } |
| | | } |
| | | |
| | | if (callNextOnCursor) |
| | | { |
| | | // advance the cursor we just read from, |
| | | // success/failure will be checked later |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | } |
| | | |
| | | private void removeCursors(DN baseDN) |
| | | { |
| | | if (nextChangeForInsertDBCursor != null) |
| | | { |
| | | nextChangeForInsertDBCursor.close(); |
| | | nextChangeForInsertDBCursor = null; |
| | | } |
| | | if (DN.NULL_DN.equals(baseDN)) |
| | | { |
| | | // close all cursors |
| | | for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values()) |
| | | { |
| | | StaticUtils.close(map.values()); |
| | | } |
| | | allCursors.clear(); |
| | | newCursors.clear(); |
| | | } |
| | | else |
| | | { |
| | | // close cursors for this DN |
| | | final Map<Integer, DBCursor<UpdateMsg>> map = allCursors.remove(baseDN); |
| | | if (map != null) |
| | | { |
| | | StaticUtils.close(map.values()); |
| | | } |
| | | for (Iterator<Pair<DN, Integer>> it = newCursors.keySet().iterator(); it.hasNext();) |
| | | { |
| | | if (it.next().getFirst().equals(baseDN)) |
| | | { |
| | | it.remove(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>> |
| | | getCursor(final DN baseDN, final int serverId) throws ChangelogException |
| | | { |
| | | for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1 |
| | | : allCursors.entrySet()) |
| | | { |
| | | if (baseDN.equals(entry1.getKey())) |
| | | { |
| | | for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = |
| | | entry1.getValue().entrySet().iterator(); iter.hasNext();) |
| | | { |
| | | final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next(); |
| | | if (serverId == entry2.getKey()) |
| | | { |
| | | return Pair.of(entry2.getValue(), iter); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | return Pair.empty(); |
| | | } |
| | | |
| | | private boolean recycleExhaustedCursors() throws ChangelogException |
| | | { |
| | | boolean succesfullyRecycled = false; |
| | | for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values()) |
| | | { |
| | | for (DBCursor<UpdateMsg> cursor : map.values()) |
| | | { |
| | | // try to recycle it by calling next() |
| | | if (cursor.getRecord() == null && cursor.next()) |
| | | { |
| | | succesfullyRecycled = true; |
| | | } |
| | | } |
| | | } |
| | | return succesfullyRecycled; |
| | | } |
| | | |
| | | private boolean createNewCursors() throws ChangelogException |
| | | { |
| | | if (!newCursors.isEmpty()) |
| | | { |
| | | boolean newCursorAdded = false; |
| | | for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter = |
| | | newCursors.entrySet().iterator(); iter.hasNext();) |
| | | { |
| | | final Entry<Pair<DN, Integer>, CSN> entry = iter.next(); |
| | | final DN baseDN = entry.getKey().getFirst(); |
| | | final CSN csn = entry.getValue(); |
| | | // start after preceding CSN so the first CSN read will exactly be the |
| | | // current one |
| | | final CSN startFromCSN = getPrecedingCSN(csn); |
| | | if (!ensureCursorExists(baseDN, csn.getServerId(), startFromCSN)) |
| | | { |
| | | newCursorAdded = true; |
| | | } |
| | | iter.remove(); |
| | | } |
| | | return newCursorAdded; |
| | | } |
| | | return false; |
| | | // advance the cursor we just read from, |
| | | // success/failure will be checked later |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @param <Data> |
| | | * The type of data associated with each cursor |
| | | */ |
| | | final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg> |
| | | abstract class CompositeDBCursor<Data> implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | private static final byte UNINITIALIZED = 0; |
| | |
| | | */ |
| | | private byte state = UNINITIALIZED; |
| | | |
| | | /** Whether this composite should try to recycle exhausted cursors. */ |
| | | private final boolean recycleExhaustedCursors; |
| | | /** |
| | | * These cursors are considered exhausted because they had no new changes the |
| | | * last time {@link DBCursor#next()} was called on them. Exhausted cursors |
| | |
| | | /** |
| | | * The cursors are sorted based on the current change of each cursor to |
| | | * consider the next change across all available cursors. |
| | | * <p> |
| | | * New cursors for this Map must be created from the same thread that will |
| | | * make use of them. When this rule is not obeyed, a JE exception will be |
| | | * thrown about |
| | | * "Non-transactional Cursors may not be used in multiple threads;". |
| | | */ |
| | | private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors = |
| | | private final TreeMap<DBCursor<UpdateMsg>, Data> cursors = |
| | | new TreeMap<DBCursor<UpdateMsg>, Data>( |
| | | new Comparator<DBCursor<UpdateMsg>>() |
| | | { |
| | |
| | | } |
| | | }); |
| | | |
| | | /** |
| | | * Builds a CompositeDBCursor using the provided collection of cursors. |
| | | * |
| | | * @param cursors |
| | | * the cursors that will be iterated upon. |
| | | * @param recycleExhaustedCursors |
| | | * whether a call to {@link #next()} tries to recycle exhausted |
| | | * cursors |
| | | */ |
| | | public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors, |
| | | boolean recycleExhaustedCursors) |
| | | { |
| | | this.recycleExhaustedCursors = recycleExhaustedCursors; |
| | | for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet()) |
| | | { |
| | | put(entry); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | |
| | | { |
| | | return false; |
| | | } |
| | | final boolean advanceNonExhaustedCursors = state != UNINITIALIZED; |
| | | |
| | | // If previous state was ready, then we must advance the first cursor |
| | | // (which UpdateMsg has been consumed). |
| | | // To keep consistent the cursors' order in the SortedSet, it is necessary |
| | | // to remove the first cursor, then add it again after moving it forward. |
| | | final Entry<DBCursor<UpdateMsg>, Data> cursorToAdvance = |
| | | state != UNINITIALIZED ? cursors.pollFirstEntry() : null; |
| | | state = READY; |
| | | if (recycleExhaustedCursors && !exhaustedCursors.isEmpty()) |
| | | recycleExhaustedCursors(); |
| | | if (cursorToAdvance != null) |
| | | { |
| | | // try to recycle empty cursors in case the underlying ReplicaDBs received |
| | | // new changes. |
| | | addCursor(cursorToAdvance.getKey(), cursorToAdvance.getValue()); |
| | | } |
| | | |
| | | removeNoLongerNeededCursors(); |
| | | incorporateNewCursors(); |
| | | return !cursors.isEmpty(); |
| | | } |
| | | |
| | | private void recycleExhaustedCursors() throws ChangelogException |
| | | { |
| | | if (!exhaustedCursors.isEmpty()) |
| | | { |
| | | // try to recycle exhausted cursors in case the underlying replica DBs received new changes. |
| | | final Map<DBCursor<UpdateMsg>, Data> copy = |
| | | new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors); |
| | | exhaustedCursors.clear(); |
| | | for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet()) |
| | | { |
| | | entry.getKey().next(); |
| | | put(entry); |
| | | } |
| | | final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry(); |
| | | if (firstEntry != null && copy.containsKey(firstEntry.getKey())) |
| | | { |
| | | // if the first cursor was previously an exhausted cursor, |
| | | // then we have already called next() on it. |
| | | // Avoid calling it again because we know new changes have been found. |
| | | return true; |
| | | addCursor(entry.getKey(), entry.getValue()); |
| | | } |
| | | } |
| | | |
| | | // To keep consistent the cursors' order in the SortedSet, it is necessary |
| | | // to remove and add again the cursor after moving it forward. |
| | | if (advanceNonExhaustedCursors) |
| | | { |
| | | Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry(); |
| | | if (firstEntry != null) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = firstEntry.getKey(); |
| | | cursor.next(); |
| | | put(firstEntry); |
| | | } |
| | | } |
| | | // no cursors are left with changes. |
| | | return !cursors.isEmpty(); |
| | | } |
| | | |
| | | private void put(Entry<DBCursor<UpdateMsg>, Data> entry) |
| | | private void removeNoLongerNeededCursors() |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = entry.getKey(); |
| | | final Data data = entry.getValue(); |
| | | if (cursor.getRecord() != null) |
| | | for (final Iterator<Data> iter = removedCursorsIterator(); iter.hasNext();) |
| | | { |
| | | final Data dataToFind = iter.next(); |
| | | for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter = |
| | | cursors.entrySet().iterator(); cursorIter.hasNext();) |
| | | { |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next(); |
| | | if (dataToFind.equals(entry.getValue())) |
| | | { |
| | | entry.getKey().close(); |
| | | cursorIter.remove(); |
| | | } |
| | | } |
| | | iter.remove(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns an Iterator over the data associated to cursors that must be removed. |
| | | * |
| | | * @return an Iterator over the data associated to cursors that must be removed. |
| | | */ |
| | | protected abstract Iterator<Data> removedCursorsIterator(); |
| | | |
| | | /** |
| | | * Adds a cursor to this composite cursor. It first calls |
| | | * {@link DBCursor#next()} to verify whether it is exhausted or not. |
| | | * |
| | | * @param cursor |
| | | * the cursor to add to this composite |
| | | * @param data |
| | | * the data associated to the provided cursor |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | protected void addCursor(final DBCursor<UpdateMsg> cursor, final Data data) throws ChangelogException |
| | | { |
| | | if (cursor.next()) |
| | | { |
| | | this.cursors.put(cursor, data); |
| | | } |
| | |
| | | @Override |
| | | public UpdateMsg getRecord() |
| | | { |
| | | // Cannot call incorporateNewCursors() here because |
| | | // somebody might have already called DBCursor.getRecord() and read the record |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry(); |
| | | if (entry != null) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Called when implementors should incorporate new cursors into the current |
| | | * composite DBCursor. Implementors should call |
| | | * {@link #addCursor(DBCursor, Object)} to do so. |
| | | * |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | protected abstract void incorporateNewCursors() throws ChangelogException; |
| | | |
| | | /** |
| | | * Returns the data associated to the cursor that returned the current record. |
| | | * |
| | | * @return the data associated to the cursor that returned the current record. |
| | |
| | | @Override |
| | | public void close() |
| | | { |
| | | state = CLOSED; |
| | | StaticUtils.close(cursors.keySet()); |
| | | StaticUtils.close(exhaustedCursors.keySet()); |
| | | cursors.clear(); |
| | | exhaustedCursors.clear(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.Collections; |
| | | import java.util.Iterator; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | | * Cursor iterating over a replication domain's replica DBs. |
| | | */ |
| | | public class DomainDBCursor extends CompositeDBCursor<Void> |
| | | { |
| | | |
| | | private final DN baseDN; |
| | | private final ReplicationDomainDB domainDB; |
| | | |
| | | private final ConcurrentSkipListMap<Integer, CSN> newReplicas = |
| | | new ConcurrentSkipListMap<Integer, CSN>(); |
| | | /** |
| | | * Replaces null CSNs in ConcurrentSkipListMap that does not support null values. |
| | | */ |
| | | private static final CSN NULL_CSN = new CSN(0, 0, 0); |
| | | |
| | | /** |
| | | * Builds a DomainDBCursor instance. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain baseDN of this cursor |
| | | * @param domainDB |
| | | * the DB for the provided replication domain |
| | | */ |
| | | public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB) |
| | | { |
| | | this.baseDN = baseDN; |
| | | this.domainDB = domainDB; |
| | | } |
| | | |
| | | /** |
| | | * Returns the replication domain baseDN of this cursor. |
| | | * |
| | | * @return the replication domain baseDN of this cursor. |
| | | */ |
| | | public DN getBaseDN() |
| | | { |
| | | return baseDN; |
| | | } |
| | | |
| | | /** |
| | | * Adds a replicaDB for this cursor to iterate over. Added cursors will be |
| | | * created and iterated over on the next call to {@link #next()}. |
| | | * |
| | | * @param serverId |
| | | * the serverId of the replica |
| | | * @param startAfterCSN |
| | | * the CSN after which to start iterating |
| | | */ |
| | | public void addReplicaDB(int serverId, CSN startAfterCSN) |
| | | { |
| | | // only keep the oldest CSN that will be the new cursor's starting point |
| | | newReplicas.putIfAbsent(serverId, startAfterCSN != null ? startAfterCSN : NULL_CSN); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | protected void incorporateNewCursors() throws ChangelogException |
| | | { |
| | | for (Iterator<Entry<Integer, CSN>> iter = newReplicas.entrySet().iterator(); iter.hasNext();) |
| | | { |
| | | final Entry<Integer, CSN> pair = iter.next(); |
| | | final int serverId = pair.getKey(); |
| | | final CSN csn = pair.getValue(); |
| | | final CSN startAfterCSN = !NULL_CSN.equals(csn) ? csn : null; |
| | | final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN); |
| | | addCursor(cursor, null); |
| | | iter.remove(); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | @SuppressWarnings("unchecked") |
| | | protected Iterator<Void> removedCursorsIterator() |
| | | { |
| | | return Collections.EMPTY_LIST.iterator(); // nothing to remove |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | super.close(); |
| | | domainDB.unregisterCursor(this); |
| | | newReplicas.clear(); |
| | | } |
| | | |
| | | } |
| | |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | |
| | | * <li>then check it's not null</li> |
| | | * <li>then close all inside</li> |
| | | * </ol> |
| | | * When creating a JEReplicaDB, synchronize on the domainMap to avoid |
| | | * When creating a replicaDB, synchronize on the domainMap to avoid |
| | | * concurrent shutdown. |
| | | */ |
| | | private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> |
| | | domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>(); |
| | | private ReplicationDbEnv dbEnv; |
| | | private ReplicationServerCfg config; |
| | | private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs = |
| | | new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>(); |
| | | /** |
| | | * \@GuardedBy("itself") |
| | | */ |
| | | private final Map<DN, List<DomainDBCursor>> registeredDomainCursors = |
| | | new HashMap<DN, List<DomainDBCursor>>(); |
| | | private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = |
| | | new CopyOnWriteArrayList<MultiDomainDBCursor>(); |
| | | private ReplicationDbEnv replicationEnv; |
| | | private final ReplicationServerCfg config; |
| | | private final File dbDirectory; |
| | | |
| | | /** |
| | |
| | | |
| | | /** The local replication server. */ |
| | | private final ReplicationServer replicationServer; |
| | | private AtomicBoolean shutdown = new AtomicBoolean(); |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(); |
| | | |
| | | private static final DBCursor<UpdateMsg> EMPTY_CURSOR = |
| | | private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB = |
| | | new DBCursor<UpdateMsg>() |
| | | { |
| | | |
| | |
| | | }; |
| | | |
| | | /** |
| | | * Builds an instance of this class. |
| | | * Creates a new changelog DB. |
| | | * |
| | | * @param replicationServer |
| | | * the local replication server. |
| | |
| | | * @throws ConfigException |
| | | * if a problem occurs opening the supplied directory |
| | | */ |
| | | public JEChangelogDB(ReplicationServer replicationServer, |
| | | ReplicationServerCfg config) throws ConfigException |
| | | public JEChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config) |
| | | throws ConfigException |
| | | { |
| | | this.config = config; |
| | | this.replicationServer = replicationServer; |
| | | this.dbDirectory = makeDir(config.getReplicationDBDirectory()); |
| | | } |
| | | |
| | | private File makeDir(String dbDirName) throws ConfigException |
| | | private File makeDir(final String dbDirName) throws ConfigException |
| | | { |
| | | // Check that this path exists or create it. |
| | | final File dbDirectory = getFileForPath(dbDirName); |
| | |
| | | { |
| | | logger.traceException(e); |
| | | |
| | | final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(); |
| | | mb.append(e.getLocalizedMessage()); |
| | | mb.append(" "); |
| | | mb.append(dbDirectory); |
| | | throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb), e); |
| | | final LocalizableMessageBuilder mb = new LocalizableMessageBuilder( |
| | | e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory)); |
| | | throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e); |
| | | } |
| | | } |
| | | |
| | | private Map<Integer, JEReplicaDB> getDomainMap(DN baseDN) |
| | | private Map<Integer, JEReplicaDB> getDomainMap(final DN baseDN) |
| | | { |
| | | final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); |
| | | if (domainMap != null) |
| | |
| | | return Collections.emptyMap(); |
| | | } |
| | | |
| | | private JEReplicaDB getReplicaDB(DN baseDN, int serverId) |
| | | private JEReplicaDB getReplicaDB(final DN baseDN, final int serverId) |
| | | { |
| | | return getDomainMap(baseDN).get(serverId); |
| | | } |
| | | |
| | | /** |
| | | * Provision resources for the specified serverId in the specified replication |
| | | * domain. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain where to add the serverId |
| | | * @param serverId |
| | | * the server Id to add to the replication domain |
| | | * @throws ChangelogException |
| | | * If a database error happened. |
| | | */ |
| | | private void commission(DN baseDN, int serverId, ReplicationServer rs) |
| | | throws ChangelogException |
| | | { |
| | | getOrCreateReplicaDB(baseDN, serverId, rs); |
| | | } |
| | | |
| | | /** |
| | | * Returns a {@link JEReplicaDB}, possibly creating it. |
| | | * |
| | | * @param baseDN |
| | |
| | | * the serverId for which to create a ReplicaDB |
| | | * @param server |
| | | * the ReplicationServer |
| | | * @return a Pair with the JEReplicaDB and a boolean indicating whether it had |
| | | * to be created |
| | | * @return a Pair with the JEReplicaDB and a boolean indicating whether it has been created |
| | | * @throws ChangelogException |
| | | * if a problem occurred with the database |
| | | */ |
| | | Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN, |
| | | int serverId, ReplicationServer server) throws ChangelogException |
| | | Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId, |
| | | final ReplicationServer server) throws ChangelogException |
| | | { |
| | | while (!shutdown.get()) |
| | | { |
| | | final ConcurrentMap<Integer, JEReplicaDB> domainMap = |
| | | getExistingOrNewDomainMap(baseDN); |
| | | final Pair<JEReplicaDB, Boolean> result = |
| | | getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server); |
| | | final ConcurrentMap<Integer, JEReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN); |
| | | final Pair<JEReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server); |
| | | if (result != null) |
| | | { |
| | | final Boolean dbWasCreated = result.getSecond(); |
| | | if (dbWasCreated) |
| | | { // new replicaDB => update all cursors with it |
| | | final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors != null && !cursors.isEmpty()) |
| | | { |
| | | for (DomainDBCursor cursor : cursors) |
| | | { |
| | | cursor.addReplicaDB(serverId, null); |
| | | } |
| | | } |
| | | } |
| | | |
| | | return result; |
| | | } |
| | | } |
| | | throw new ChangelogException( |
| | | ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get()); |
| | | throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get()); |
| | | } |
| | | |
| | | private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap( |
| | | DN baseDN) |
| | | private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(final DN baseDN) |
| | | { |
| | | // happy path: the domainMap already exists |
| | | final ConcurrentMap<Integer, JEReplicaDB> currentValue = |
| | | domainToReplicaDBs.get(baseDN); |
| | | final ConcurrentMap<Integer, JEReplicaDB> currentValue = domainToReplicaDBs.get(baseDN); |
| | | if (currentValue != null) |
| | | { |
| | | return currentValue; |
| | |
| | | // unlucky, the domainMap does not exist: take the hit and create the |
| | | // newValue, even though the same could be done concurrently by another |
| | | // thread |
| | | final ConcurrentMap<Integer, JEReplicaDB> newValue = |
| | | new ConcurrentHashMap<Integer, JEReplicaDB>(); |
| | | final ConcurrentMap<Integer, JEReplicaDB> previousValue = |
| | | domainToReplicaDBs.putIfAbsent(baseDN, newValue); |
| | | final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<Integer, JEReplicaDB>(); |
| | | final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue); |
| | | if (previousValue != null) |
| | | { |
| | | // there was already a value associated to the key, let's use it |
| | | return previousValue; |
| | | } |
| | | |
| | | if (MultimasterReplication.isECLEnabledDomain(baseDN)) |
| | | { |
| | | // we just created a new domain => update all cursors |
| | | for (MultiDomainDBCursor cursor : registeredMultiDomainCursors) |
| | | { |
| | | cursor.addDomain(baseDN, null); |
| | | } |
| | | } |
| | | return newValue; |
| | | } |
| | | |
| | | private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB( |
| | | final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId, |
| | | DN baseDN, ReplicationServer server) throws ChangelogException |
| | | private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, JEReplicaDB> domainMap, |
| | | final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException |
| | | { |
| | | // happy path: the JEReplicaDB already exists |
| | | // happy path: the replicaDB already exists |
| | | JEReplicaDB currentValue = domainMap.get(serverId); |
| | | if (currentValue != null) |
| | | { |
| | | return Pair.of(currentValue, false); |
| | | } |
| | | |
| | | // unlucky, the JEReplicaDB does not exist: take the hit and synchronize |
| | | // unlucky, the replicaDB does not exist: take the hit and synchronize |
| | | // on the domainMap to create a new ReplicaDB |
| | | synchronized (domainMap) |
| | | { |
| | |
| | | // The domainMap could have been concurrently removed because |
| | | // 1) a shutdown was initiated or 2) an initialize was called. |
| | | // Return will allow the code to: |
| | | // 1) shutdown properly or 2) lazily recreate the JEReplicaDB |
| | | // 1) shutdown properly or 2) lazily recreate the replicaDB |
| | | return null; |
| | | } |
| | | |
| | | final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, dbEnv); |
| | | final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, replicationEnv); |
| | | domainMap.put(serverId, newDB); |
| | | return Pair.of(newDB, true); |
| | | } |
| | |
| | | try |
| | | { |
| | | final File dbDir = getFileForPath(config.getReplicationDBDirectory()); |
| | | dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer); |
| | | final ChangelogState changelogState = dbEnv.getChangelogState(); |
| | | replicationEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer); |
| | | final ChangelogState changelogState = replicationEnv.getChangelogState(); |
| | | initializeToChangelogState(changelogState); |
| | | if (config.isComputeChangeNumber()) |
| | | { |
| | |
| | | { |
| | | for (int serverId : entry.getValue()) |
| | | { |
| | | commission(entry.getKey(), serverId, replicationServer); |
| | | getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void shutdownCNIndexDB() throws ChangelogException |
| | | private void shutdownChangeNumberIndexDB() throws ChangelogException |
| | | { |
| | | synchronized (cnIndexDBLock) |
| | | { |
| | |
| | | |
| | | try |
| | | { |
| | | shutdownCNIndexDB(); |
| | | shutdownChangeNumberIndexDB(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | if (dbEnv != null) |
| | | if (replicationEnv != null) |
| | | { |
| | | // wait for shutdown of the threads holding cursors |
| | | try |
| | |
| | | // do nothing: we are already shutting down |
| | | } |
| | | |
| | | dbEnv.shutdown(); |
| | | replicationEnv.shutdown(); |
| | | } |
| | | |
| | | if (firstException != null) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Clears all content from the changelog database, but leaves its directory on |
| | | * the filesystem. |
| | | * Clears all records from the changelog (does not remove the changelog itself). |
| | | * |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * If an error occurs when clearing the changelog. |
| | | */ |
| | | public void clearDB() throws ChangelogException |
| | | { |
| | |
| | | |
| | | try |
| | | { |
| | | shutdownCNIndexDB(); |
| | | shutdownChangeNumberIndexDB(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | // 3- clear the changelogstate DB |
| | | try |
| | | { |
| | | dbEnv.clearGenerationId(baseDN); |
| | | replicationEnv.clearGenerationId(baseDN); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | { |
| | | if (computeChangeNumber) |
| | | { |
| | | startIndexer(dbEnv.getChangelogState()); |
| | | startIndexer(replicationEnv.getChangelogState()); |
| | | } |
| | | else |
| | | { |
| | |
| | | { |
| | | try |
| | | { |
| | | cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv); |
| | | cnIndexDB = new JEChangeNumberIndexDB(replicationEnv); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState) |
| | | throws ChangelogException |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException |
| | | { |
| | | final Set<Integer> serverIds = getDomainMap(baseDN).keySet(); |
| | | final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas(); |
| | | final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size()); |
| | | for (int serverId : serverIds) |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this); |
| | | registeredMultiDomainCursors.add(cursor); |
| | | for (DN baseDN : domainToReplicaDBs.keySet()) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; |
| | | final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); |
| | | replicaDBCursor.next(); |
| | | final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState); |
| | | cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null); |
| | | cursor.addDomain(baseDN, startAfterState.getServerState(baseDN)); |
| | | } |
| | | // recycle exhausted cursors, |
| | | // because client code will not manage the cursors itself |
| | | return new CompositeDBCursor<Void>(cursors, true); |
| | | return cursor; |
| | | } |
| | | |
| | | private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId, |
| | | ServerState startAfterServerState) |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState) |
| | | throws ChangelogException |
| | | { |
| | | final ServerState domainState = offlineReplicas.getServerState(baseDN); |
| | | if (domainState != null) |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN); |
| | | for (int serverId : getDomainMap(baseDN).keySet()) |
| | | { |
| | | for (CSN offlineCSN : domainState) |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null; |
| | | cursor.addReplicaDB(serverId, lastCSN); |
| | | } |
| | | return cursor; |
| | | } |
| | | |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN) |
| | | { |
| | | synchronized (registeredDomainCursors) |
| | | { |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this); |
| | | List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors == null) |
| | | { |
| | | if (serverId == offlineCSN.getServerId() |
| | | && !startAfterServerState.cover(offlineCSN)) |
| | | { |
| | | return offlineCSN; |
| | | } |
| | | cursors = new ArrayList<DomainDBCursor>(); |
| | | registeredDomainCursors.put(baseDN, cursors); |
| | | } |
| | | cursors.add(cursor); |
| | | return cursor; |
| | | } |
| | | } |
| | | |
| | | private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN) |
| | | { |
| | | final MultiDomainServerState offlineReplicas = |
| | | replicationEnv.getChangelogState().getOfflineReplicas(); |
| | | final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId); |
| | | if (offlineCSN != null |
| | | && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN))) |
| | | { |
| | | return offlineCSN; |
| | | } |
| | | return null; |
| | | } |
| | |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN) |
| | | throws ChangelogException |
| | | { |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | return replicaDB.generateCursorFrom(startAfterCSN); |
| | | final DBCursor<UpdateMsg> cursor = |
| | | replicaDB.generateCursorFrom(startAfterCSN); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN); |
| | | // TODO JNR if (offlineCSN != null) ?? |
| | | // What about replicas that suddenly become offline? |
| | | return new ReplicaOfflineCursor(cursor, offlineCSN); |
| | | } |
| | | return EMPTY_CURSOR; |
| | | return EMPTY_CURSOR_REPLICA_DB; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg) |
| | | throws ChangelogException |
| | | public void unregisterCursor(final DBCursor<?> cursor) |
| | | { |
| | | final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN, |
| | | updateMsg.getCSN().getServerId(), replicationServer); |
| | | final JEReplicaDB replicaDB = pair.getFirst(); |
| | | final boolean wasCreated = pair.getSecond(); |
| | | if (cursor instanceof MultiDomainDBCursor) |
| | | { |
| | | registeredMultiDomainCursors.remove(cursor); |
| | | } |
| | | else if (cursor instanceof DomainDBCursor) |
| | | { |
| | | final DomainDBCursor domainCursor = (DomainDBCursor) cursor; |
| | | synchronized (registeredMultiDomainCursors) |
| | | { |
| | | final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN()); |
| | | if (cursors != null) |
| | | { |
| | | cursors.remove(cursor); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException |
| | | { |
| | | final CSN csn = updateMsg.getCSN(); |
| | | final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN, |
| | | csn.getServerId(), replicationServer); |
| | | final JEReplicaDB replicaDB = pair.getFirst(); |
| | | replicaDB.add(updateMsg); |
| | | |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | indexer.publishUpdateMsg(baseDN, updateMsg); |
| | | } |
| | | return wasCreated; |
| | | return pair.getSecond(); // replica DB was created |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | @Override |
| | | public void replicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException |
| | | { |
| | | dbEnv.addOfflineReplica(baseDN, offlineCSN); |
| | | replicationEnv.addOfflineReplica(baseDN, offlineCSN); |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.Iterator; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.concurrent.ConcurrentSkipListSet; |
| | | |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | | * Cursor iterating over a all the replication domain known to the changelog DB. |
| | | */ |
| | | public class MultiDomainDBCursor extends CompositeDBCursor<DN> |
| | | { |
| | | private final ReplicationDomainDB domainDB; |
| | | |
| | | private final ConcurrentSkipListMap<DN, ServerState> newDomains = |
| | | new ConcurrentSkipListMap<DN, ServerState>(); |
| | | private final ConcurrentSkipListSet<DN> removeDomains = |
| | | new ConcurrentSkipListSet<DN>(); |
| | | |
| | | /** |
| | | * Builds a MultiDomainDBCursor instance. |
| | | * |
| | | * @param domainDB |
| | | * the replication domain management DB |
| | | */ |
| | | public MultiDomainDBCursor(ReplicationDomainDB domainDB) |
| | | { |
| | | this.domainDB = domainDB; |
| | | } |
| | | |
| | | /** |
| | | * Adds a replication domain for this cursor to iterate over. Added cursors |
| | | * will be created and iterated over on the next call to {@link #next()}. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain's baseDN |
| | | * @param startAfterState |
| | | * the {@link ServerState} after which to start iterating |
| | | */ |
| | | public void addDomain(DN baseDN, ServerState startAfterState) |
| | | { |
| | | newDomains.put(baseDN, |
| | | startAfterState != null ? startAfterState : new ServerState()); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | protected void incorporateNewCursors() throws ChangelogException |
| | | { |
| | | for (Iterator<Entry<DN, ServerState>> iter = newDomains.entrySet().iterator(); |
| | | iter.hasNext();) |
| | | { |
| | | final Entry<DN, ServerState> entry = iter.next(); |
| | | final DN baseDN = entry.getKey(); |
| | | final ServerState serverState = entry.getValue(); |
| | | final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState); |
| | | addCursor(domainDBCursor, baseDN); |
| | | iter.remove(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Removes a replication domain from this cursor and stops iterating over it. |
| | | * Removed cursors will be effectively removed on the next call to |
| | | * {@link #next()}. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain's baseDN |
| | | */ |
| | | public void removeDomain(DN baseDN) |
| | | { |
| | | removeDomains.add(baseDN); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | protected Iterator<DN> removedCursorsIterator() |
| | | { |
| | | return removeDomains.iterator(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | super.close(); |
| | | domainDB.unregisterCursor(this); |
| | | newDomains.clear(); |
| | | removeDomains.clear(); |
| | | } |
| | | |
| | | } |
| | |
| | | public class ExternalChangeLogTest extends ReplicationTestCase |
| | | { |
| | | |
| | | private static class Results |
| | | { |
| | | |
| | | public final List<SearchResultEntryProtocolOp> searchResultEntries = |
| | | new ArrayList<SearchResultEntryProtocolOp>(); |
| | | public long searchReferences; |
| | | public long searchesDone; |
| | | |
| | | } |
| | | |
| | | private static final int SERVER_ID_1 = 1201; |
| | | private static final int SERVER_ID_2 = 1202; |
| | | |
| | |
| | | @Test(enabled=true, dependsOnMethods = { "PrimaryTest"}) |
| | | public void TestWithAndWithoutControl() throws Exception |
| | | { |
| | | final String tn = "TestWithAndWithoutControl"; |
| | | replicationServer.getChangelogDB().setPurgeDelay(0); |
| | | // Write changes and read ECL from start |
| | | ECLCompatWriteReadAllOps(1); |
| | | ECLCompatWriteReadAllOps(1, tn); |
| | | |
| | | ECLCompatNoControl(1); |
| | | |
| | | // Write additional changes and read ECL from a provided change number |
| | | ECLCompatWriteReadAllOps(5); |
| | | ECLCompatWriteReadAllOps(5, tn); |
| | | } |
| | | |
| | | @Test(enabled=false, dependsOnMethods = { "PrimaryTest"}) |
| | |
| | | @Test(enabled=false, groups="slow", dependsOnMethods = { "PrimaryTest"}) |
| | | public void ECLReplicationServerFullTest15() throws Exception |
| | | { |
| | | final String tn = "ECLReplicationServerFullTest15"; |
| | | replicationServer.getChangelogDB().setPurgeDelay(0); |
| | | // Write 4 changes and read ECL from start |
| | | ECLCompatWriteReadAllOps(1); |
| | | ECLCompatWriteReadAllOps(1, tn); |
| | | |
| | | // Write 4 additional changes and read ECL from a provided change number |
| | | CSN csn = ECLCompatWriteReadAllOps(5); |
| | | CSN csn = ECLCompatWriteReadAllOps(5, tn); |
| | | |
| | | // Test request from a provided change number - read 6 |
| | | ECLCompatReadFrom(6, csn); |
| | |
| | | |
| | | final CSN[] csns = generateCSNs(3, SERVER_ID_1); |
| | | publishDeleteMsgInOTest(server01, csns[0], testName, 1); |
| | | |
| | | Thread.sleep(1000); |
| | | |
| | | // Test that last cookie has been updated |
| | | String cookieNotEmpty = readLastCookie(); |
| | | debugInfo(testName, "Store cookie not empty=\"" + cookieNotEmpty + "\""); |
| | | |
| | | final String firstCookie = assertLastCookieDifferentThanLastValue(""); |
| | | String lastCookie = firstCookie; |
| | | publishDeleteMsgInOTest(server01, csns[1], testName, 2); |
| | | lastCookie = assertLastCookieDifferentThanLastValue(lastCookie); |
| | | publishDeleteMsgInOTest(server01, csns[2], testName, 3); |
| | | lastCookie = assertLastCookieDifferentThanLastValue(lastCookie); |
| | | |
| | | // --- |
| | | // 2. Now set up a very short purge delay on the replication changelogs |
| | |
| | | // returns the appropriate error. |
| | | debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN)); |
| | | debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2)); |
| | | searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, 0, testName, UNWILLING_TO_PERFORM); |
| | | searchOp = searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM); |
| | | assertTrue(searchOp.getErrorMessage().toString().startsWith( |
| | | ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(TEST_ROOT_DN_STRING).toString()), |
| | | searchOp.getErrorMessage().toString()); |
| | |
| | | |
| | | final CSN[] csns = generateCSNs(3, SERVER_ID_1); |
| | | publishDeleteMsgInOTest(server01, csns[0], testName, 1); |
| | | |
| | | Thread.sleep(1000); |
| | | |
| | | // Test that last cookie has been updated |
| | | String cookieNotEmpty = readLastCookie(); |
| | | debugInfo(testName, "Store cookie not empty=\"" + cookieNotEmpty + "\""); |
| | | |
| | | final String firstCookie = assertLastCookieDifferentThanLastValue(""); |
| | | String lastCookie = firstCookie; |
| | | publishDeleteMsgInOTest(server01, csns[1], testName, 2); |
| | | lastCookie = assertLastCookieDifferentThanLastValue(lastCookie); |
| | | publishDeleteMsgInOTest(server01, csns[2], testName, 3); |
| | | lastCookie = assertLastCookieDifferentThanLastValue(lastCookie); |
| | | |
| | | // --- |
| | | // 2. Now remove the domain by sending a reset message |
| | | ResetGenerationIdMsg msg = new ResetGenerationIdMsg(23657); |
| | | server01.publish(msg); |
| | | server01.publish(new ResetGenerationIdMsg(23657)); |
| | | |
| | | // --- |
| | | // 3. Assert that a request with an empty cookie returns nothing |
| | | // since replication changelog has been cleared |
| | | String cookie= ""; |
| | | InternalSearchOperation searchOp = null; |
| | | searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS); |
| | | |
| | | // --- |
| | |
| | | // since replication changelog has been cleared |
| | | cookie = readLastCookie(); |
| | | debugInfo(testName, "2. Search with last cookie=" + cookie + "\""); |
| | | searchOp = searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS); |
| | | searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS); |
| | | |
| | | // --- |
| | | // 5. Assert that a request with an "old" cookie - one that refers to |
| | |
| | | // returns the appropriate error. |
| | | debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN)); |
| | | debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2)); |
| | | searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, 0, testName, UNWILLING_TO_PERFORM); |
| | | final InternalSearchOperation searchOp = |
| | | searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM); |
| | | assertThat(searchOp.getErrorMessage().toString()).contains("unknown replicated domain", TEST_ROOT_DN_STRING.toString()); |
| | | } |
| | | finally |
| | |
| | | debugInfo(testName, "Ending test successfully"); |
| | | } |
| | | |
| | | private String assertLastCookieDifferentThanLastValue(final String lastCookie) throws Exception |
| | | { |
| | | int cnt = 0; |
| | | while (cnt < 100) |
| | | { |
| | | final String newCookie = readLastCookie(); |
| | | if (!newCookie.equals(lastCookie)) |
| | | { |
| | | return newCookie; |
| | | } |
| | | cnt++; |
| | | Thread.sleep(10); |
| | | } |
| | | Assertions.fail("Expected last cookie would have been updated, but it always stayed at value '" + lastCookie + "'"); |
| | | return null;// dead code |
| | | } |
| | | |
| | | private void debugAndWriteEntries(LDIFWriter ldifWriter, |
| | | List<SearchResultEntry> entries, String tn) throws Exception |
| | | { |
| | |
| | | |
| | | // Publish ADD |
| | | csnCounter++; |
| | | String lentry = "dn: uid="+tn+"2," + TEST_ROOT_DN_STRING + "\n" |
| | | + "objectClass: top\n" + "objectClass: domain\n" |
| | | + "entryUUID: "+user1entryUUID+"\n"; |
| | | Entry entry = TestCaseUtils.entryFromLdifString(lentry); |
| | | Entry entry = TestCaseUtils.entryFromLdifString( |
| | | "dn: uid=" + tn + "2," + TEST_ROOT_DN_STRING + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: domain\n" |
| | | + "entryUUID: " + user1entryUUID + "\n"); |
| | | AddMsg addMsg = new AddMsg( |
| | | csns[csnCounter], |
| | | DN.valueOf("uid="+tn+"2," + TEST_ROOT_DN_STRING), |
| | |
| | | |
| | | InvocationCounterPlugin.resetAllCounters(); |
| | | |
| | | long searchEntries; |
| | | long searchReferences = ldapStatistics.getSearchResultReferences(); |
| | | long searchesDone = ldapStatistics.getSearchResultsDone(); |
| | | final Results results = new Results(); |
| | | results.searchReferences = ldapStatistics.getSearchResultReferences(); |
| | | results.searchesDone = ldapStatistics.getSearchResultsDone(); |
| | | |
| | | debugInfo(tn, "Search Persistent filter=(targetDN=*"+tn+"*,o=test)"); |
| | | LDAPMessage message = new LDAPMessage(2, searchRequest, controls); |
| | | w.writeMessage(message); |
| | | w.writeMessage(new LDAPMessage(2, searchRequest, controls)); |
| | | Thread.sleep(500); |
| | | |
| | | if (!changesOnly) |
| | | { |
| | | // Wait for change 1 |
| | | debugInfo(tn, "Waiting for init search expected to return change 1"); |
| | | searchEntries = 0; |
| | | readMessages(tn, r, results, 1, "Init search Result="); |
| | | for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries) |
| | | { |
| | | while (searchEntries < 1 && (message = r.readMessage()) != null) |
| | | { |
| | | debugInfo(tn, "Init search Result=" + |
| | | message.getProtocolOpType() + message + " " + searchEntries); |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | SearchResultEntryProtocolOp searchResultEntry = |
| | | message.getSearchResultEntryProtocolOp(); |
| | | searchEntries++; |
| | | // FIXME:ECL Double check 1 is really the valid value here. |
| | | checkValue(searchResultEntry.toSearchResultEntry(),"changenumber", |
| | | (compatMode?"1":"0")); |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | searchReferences++; |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | searchesDone++; |
| | | break; |
| | | } |
| | | } |
| | | // FIXME:ECL Double check 1 is really the valid value here. |
| | | final String cn = compatMode ? "1" : "0"; |
| | | checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn); |
| | | } |
| | | debugInfo(tn, "INIT search done with success. searchEntries=" |
| | | + searchEntries + " #searchesDone="+ searchesDone); |
| | | + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone); |
| | | } |
| | | |
| | | // Produces change 2 |
| | |
| | | " published , psearch will now wait for new entries"); |
| | | |
| | | // wait for the 1 new entry |
| | | searchEntries = 0; |
| | | SearchResultEntryProtocolOp searchResultEntry = null; |
| | | while (searchEntries < 1 && (message = r.readMessage()) != null) |
| | | { |
| | | debugInfo(tn, "psearch search Result=" + |
| | | message.getProtocolOpType() + message); |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | searchResultEntry = message.getSearchResultEntryProtocolOp(); |
| | | searchEntries++; |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | searchReferences++; |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | // assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1); |
| | | searchesDone++; |
| | | break; |
| | | } |
| | | } |
| | | readMessages(tn, r, results, 1, "psearch search Result="); |
| | | SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(0); |
| | | Thread.sleep(1000); |
| | | |
| | | // Check we received change 2 |
| | |
| | | createSearchRequest("(targetDN=*directpsearch*,o=test)", null); |
| | | |
| | | debugInfo(tn, "ACI test : sending search"); |
| | | message = new LDAPMessage(2, searchRequest, createCookieControl("")); |
| | | w.writeMessage(message); |
| | | w.writeMessage(new LDAPMessage(2, searchRequest, createCookieControl(""))); |
| | | |
| | | searchesDone=0; |
| | | searchEntries = 0; |
| | | LDAPMessage message; |
| | | int searchesDone = 0; |
| | | int searchEntries = 0; |
| | | int searchReferences = 0; |
| | | while ((searchesDone==0) && (message = r.readMessage()) != null) |
| | | { |
| | | debugInfo(tn, "ACI test : message returned " + |
| | |
| | | |
| | | InvocationCounterPlugin.resetAllCounters(); |
| | | |
| | | ldapStatistics.getSearchRequests(); |
| | | long searchEntries = ldapStatistics.getSearchResultEntries(); |
| | | ldapStatistics.getSearchResultReferences(); |
| | | long searchesDone = ldapStatistics.getSearchResultsDone(); |
| | | final Results results = new Results(); |
| | | results.searchesDone = ldapStatistics.getSearchResultsDone(); |
| | | |
| | | LDAPMessage message; |
| | | message = new LDAPMessage(2, searchRequest1, controls); |
| | | w1.writeMessage(message); |
| | | w1.writeMessage(new LDAPMessage(2, searchRequest1, controls)); |
| | | Thread.sleep(500); |
| | | |
| | | message = new LDAPMessage(2, searchRequest2, controls); |
| | | w2.writeMessage(message); |
| | | w2.writeMessage(new LDAPMessage(2, searchRequest2, controls)); |
| | | Thread.sleep(500); |
| | | |
| | | message = new LDAPMessage(2, searchRequest3, controls); |
| | | w3.writeMessage(message); |
| | | w3.writeMessage(new LDAPMessage(2, searchRequest3, controls)); |
| | | Thread.sleep(500); |
| | | |
| | | if (!changesOnly) |
| | | { |
| | | debugInfo(tn, "Search1 Persistent filter=" + searchRequest1.getFilter() |
| | | + " expected to return change " + csn1); |
| | | searchEntries = 0; |
| | | message = null; |
| | | |
| | | { |
| | | while (searchEntries < 1 && (message = r1.readMessage()) != null) |
| | | readMessages(tn, r1, results, 1, "Search1 Result="); |
| | | final int searchEntries = results.searchResultEntries.size(); |
| | | if (searchEntries == 1) |
| | | { |
| | | debugInfo(tn, "Search1 Result=" + |
| | | message.getProtocolOpType() + " " + message); |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | SearchResultEntryProtocolOp searchResultEntry = |
| | | message.getSearchResultEntryProtocolOp(); |
| | | searchEntries++; |
| | | if (searchEntries==1) |
| | | { |
| | | checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",csn1.toString()); |
| | | checkValue(searchResultEntry.toSearchResultEntry(),"changenumber", |
| | | (compatMode?"10":"0")); |
| | | } |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | searchesDone++; |
| | | break; |
| | | } |
| | | final SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(1); |
| | | final String cn = compatMode ? "10" : "0"; |
| | | checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",csn1.toString()); |
| | | checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn); |
| | | } |
| | | debugInfo(tn, "Search1 done with success. searchEntries=" |
| | | + searchEntries + " #searchesDone=" + results.searchesDone); |
| | | } |
| | | debugInfo(tn, "Search1 done with success. searchEntries=" |
| | | + searchEntries + " #searchesDone="+ searchesDone); |
| | | |
| | | searchEntries = 0; |
| | | message = null; |
| | | { |
| | | debugInfo(tn, "Search 2 Persistent filter=" + searchRequest2.getFilter() |
| | | + " expected to return change " + csn2 + " & " + csn3); |
| | | while (searchEntries < 2 && (message = r2.readMessage()) != null) |
| | | readMessages(tn, r2, results, 2, "Search 2 Result="); |
| | | for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries) |
| | | { |
| | | debugInfo(tn, "Search 2 Result=" + |
| | | message.getProtocolOpType() + message); |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | SearchResultEntryProtocolOp searchResultEntry = |
| | | message.getSearchResultEntryProtocolOp(); |
| | | searchEntries++; |
| | | checkValue(searchResultEntry.toSearchResultEntry(),"changenumber", |
| | | (compatMode?"10":"0")); |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | searchesDone++; |
| | | break; |
| | | } |
| | | final String cn = compatMode ? "10" : "0"; |
| | | checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn); |
| | | } |
| | | debugInfo(tn, "Search2 done with success. searchEntries=" |
| | | + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone); |
| | | } |
| | | debugInfo(tn, "Search2 done with success. searchEntries=" |
| | | + searchEntries + " #searchesDone="+ searchesDone); |
| | | |
| | | |
| | | searchEntries = 0; |
| | | message = null; |
| | | { |
| | | debugInfo(tn, "Search3 Persistent filter=" + searchRequest3.getFilter() |
| | | + " expected to return change top + " + csn1 + " & " + csn2 + " & " + csn3); |
| | | while (searchEntries < 4 && (message = r3.readMessage()) != null) |
| | | { |
| | | debugInfo(tn, "Search3 Result=" + |
| | | message.getProtocolOpType() + " " + message); |
| | | |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | searchEntries++; |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | searchesDone++; |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | debugInfo(tn, "Search3 Persistent filter=" + searchRequest3.getFilter() |
| | | + " expected to return change top + " + csn1 + " & " + csn2 + " & " + csn3); |
| | | readMessages(tn, r3, results, 4, "Search3 Result="); |
| | | debugInfo(tn, "Search3 done with success. searchEntries=" |
| | | + searchEntries + " #searchesDone="+ searchesDone); |
| | | |
| | | + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone); |
| | | } |
| | | |
| | | // Produces additional change |
| | |
| | | debugInfo(tn, delMsg13.getCSN() + " published additionally "); |
| | | |
| | | // wait 11 |
| | | searchEntries = 0; |
| | | message = null; |
| | | while (searchEntries < 1 && (message = r1.readMessage()) != null) |
| | | { |
| | | debugInfo(tn, "Search 11 Result=" + |
| | | message.getProtocolOpType() + " " + message); |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | searchEntries++; |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | // assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1); |
| | | searchesDone++; |
| | | break; |
| | | } |
| | | } |
| | | readMessages(tn, r1, results, 1, "Search 11 Result="); |
| | | Thread.sleep(1000); |
| | | debugInfo(tn, "Search 1 successfully receives additional changes"); |
| | | |
| | | // wait 12 & 13 |
| | | searchEntries = 0; |
| | | message = null; |
| | | while (searchEntries < 2 && (message = r2.readMessage()) != null) |
| | | { |
| | | debugInfo(tn, "psearch search 12 Result=" + |
| | | message.getProtocolOpType() + " " + message); |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | searchEntries++; |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | // assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1); |
| | | searchesDone++; |
| | | break; |
| | | } |
| | | } |
| | | readMessages(tn, r2, results, 2, "psearch search 12 Result="); |
| | | Thread.sleep(1000); |
| | | debugInfo(tn, "Search 2 successfully receives additional changes"); |
| | | |
| | | // wait 11 & 12 & 13 |
| | | searchEntries = 0; |
| | | SearchResultEntryProtocolOp searchResultEntry = null; |
| | | message = null; |
| | | while (searchEntries < 3 && (message = r3.readMessage()) != null) |
| | | { |
| | | debugInfo(tn, "psearch search 13 Result=" + |
| | | message.getProtocolOpType() + " " + message); |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | searchResultEntry = message.getSearchResultEntryProtocolOp(); |
| | | searchEntries++; |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | // assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1); |
| | | searchesDone++; |
| | | break; |
| | | } |
| | | } |
| | | readMessages(tn, r3, results, 3, "psearch search 13 Result="); |
| | | SearchResultEntryProtocolOp searchResultEntry = |
| | | results.searchResultEntries.get(results.searchResultEntries.size() - 1); |
| | | Thread.sleep(1000); |
| | | |
| | | // Check we received change 13 |
| | |
| | | debugInfo(tn, "Ends test successfully"); |
| | | } |
| | | |
| | | private void readMessages(String tn, org.opends.server.tools.LDAPReader r, |
| | | final Results results, final int i, final String string) throws Exception |
| | | { |
| | | results.searchResultEntries.clear(); |
| | | |
| | | LDAPMessage message; |
| | | while (results.searchResultEntries.size() < i |
| | | && (message = r.readMessage()) != null) |
| | | { |
| | | debugInfo(tn, string + message.getProtocolOpType() + " " + message); |
| | | |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | results.searchResultEntries.add(message.getSearchResultEntryProtocolOp()); |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | results.searchReferences++; |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | results.searchesDone++; |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void assertSuccessful(LDAPMessage message) |
| | | { |
| | | SearchResultDoneProtocolOp doneOp = message.getSearchResultDoneProtocolOp(); |
| | |
| | | new BindRequestProtocolOp( |
| | | ByteString.valueOf(bindDN), |
| | | 3, ByteString.valueOf(password)); |
| | | LDAPMessage message = new LDAPMessage(1, bindRequest); |
| | | w.writeMessage(message); |
| | | w.writeMessage(new LDAPMessage(1, bindRequest)); |
| | | |
| | | message = r.readMessage(); |
| | | final LDAPMessage message = r.readMessage(); |
| | | BindResponseProtocolOp bindResponse = message.getBindResponseProtocolOp(); |
| | | // assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1); |
| | | assertEquals(bindResponse.getResultCode(), expected); |
| | |
| | | debugInfo(tn, "Ending test successfully"); |
| | | } |
| | | |
| | | private CSN ECLCompatWriteReadAllOps(long firstChangeNumber) throws Exception |
| | | private CSN ECLCompatWriteReadAllOps(long firstChangeNumber, String testName) throws Exception |
| | | { |
| | | String tn = "ECLCompatWriteReadAllOps/" + firstChangeNumber; |
| | | String tn = testName + "-ECLCompatWriteReadAllOps/" + firstChangeNumber; |
| | | debugInfo(tn, "Starting test\n\n"); |
| | | LDAPReplicationDomain domain = null; |
| | | try |
| | |
| | | CSN[] csns = generateCSNs(4, SERVER_ID_1); |
| | | |
| | | // Publish DEL |
| | | DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID); |
| | | DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "-1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID); |
| | | server01.publish(delMsg); |
| | | debugInfo(tn, " publishes " + delMsg.getCSN()); |
| | | |
| | | // Publish ADD |
| | | String lentry = |
| | | "dn: uid="+tn+"2," + TEST_ROOT_DN_STRING + "\n" |
| | | Entry entry = TestCaseUtils.entryFromLdifString( |
| | | "dn: uid=" + tn + "-2," + TEST_ROOT_DN_STRING + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: domain\n" |
| | | + "entryUUID: "+user1entryUUID+"\n"; |
| | | Entry entry = TestCaseUtils.entryFromLdifString(lentry); |
| | | + "entryUUID: " + user1entryUUID + "\n"); |
| | | AddMsg addMsg = new AddMsg( |
| | | csns[1], |
| | | entry.getName(), |
| | |
| | | debugInfo(tn, " publishes " + addMsg.getCSN()); |
| | | |
| | | // Publish MOD |
| | | DN baseDN = DN.valueOf("uid="+tn+"3," + TEST_ROOT_DN_STRING); |
| | | DN baseDN = DN.valueOf("uid="+tn+"-3," + TEST_ROOT_DN_STRING); |
| | | List<Modification> mods = createMods("description", "new value"); |
| | | ModifyMsg modMsg = new ModifyMsg(csns[2], baseDN, mods, user1entryUUID); |
| | | server01.publish(modMsg); |
| | |
| | | |
| | | // Publish modDN |
| | | ModifyDNOperation op = new ModifyDNOperationBasis(connection, 1, 1, null, |
| | | DN.valueOf("uid="+tn+"4," + TEST_ROOT_DN_STRING), // entryDN |
| | | DN.valueOf("uid="+tn+"-4," + TEST_ROOT_DN_STRING), // entryDN |
| | | RDN.decode("uid="+tn+"new4"), // new rdn |
| | | true, // deleteoldrdn |
| | | TEST_ROOT_DN2); // new superior |
| | |
| | | server01.publish(modDNMsg); |
| | | debugInfo(tn, " publishes " + modDNMsg.getCSN()); |
| | | |
| | | String filter = "(targetdn=*" + tn + "*,o=test)"; |
| | | InternalSearchOperation searchOp = searchOnChangelog(filter, 4, tn, SUCCESS); |
| | | InternalSearchOperation searchOp = |
| | | searchOnChangelog("(targetdn=*" + tn + "*,o=test)", 4, tn, SUCCESS); |
| | | |
| | | // test 4 entries returned |
| | | final LDIFWriter ldifWriter = getLDIFWriter(); |
| | |
| | | stop(server01); |
| | | |
| | | // Test with filter on change number |
| | | filter = |
| | | String filter = |
| | | "(&(targetdn=*" + tn + "*,o=test)" |
| | | + "(&(changenumber>=" + firstChangeNumber + ")" |
| | | + "(changenumber<=" + (firstChangeNumber + 3) + ")))"; |
| | |
| | | long firstChangeNumber, int i, String tn, CSN csn) |
| | | { |
| | | final long changeNumber = firstChangeNumber + i; |
| | | final String targetDN = "uid=" + tn + (i + 1) + "," + TEST_ROOT_DN_STRING; |
| | | final String targetDN = "uid=" + tn + "-" + (i + 1) + "," + TEST_ROOT_DN_STRING; |
| | | |
| | | assertDNEquals(resultEntry, changeNumber); |
| | | checkValue(resultEntry, "changenumber", String.valueOf(changeNumber)); |
| | |
| | | |
| | | private void assertDNEquals(SearchResultEntry resultEntry, long changeNumber) |
| | | { |
| | | String actualDN = resultEntry.getName().toNormalizedString(); |
| | | String expectedDN = "changenumber=" + changeNumber + ",cn=changelog"; |
| | | assertThat(actualDN).isEqualToIgnoringCase(expectedDN); |
| | | final String actualDN = resultEntry.getName().toNormalizedString(); |
| | | final String expectedDN = "changenumber=" + changeNumber + ",cn=changelog"; |
| | | assertThat(actualDN) |
| | | .as("Unexpected DN for entry " + resultEntry) |
| | | .isEqualToIgnoringCase(expectedDN); |
| | | } |
| | | |
| | | private void ECLCompatReadFrom(long firstChangeNumber, Object csn) throws Exception |
| | |
| | | while (!cnIndexDB.isEmpty()) |
| | | { |
| | | debugInfo(tn, "cnIndexDB.count=" + cnIndexDB.count()); |
| | | Thread.sleep(200); |
| | | Thread.sleep(10); |
| | | } |
| | | |
| | | debugInfo(tn, "Ending test with success"); |
| | |
| | | private ChangeNumberIndexDB cnIndexDB; |
| | | @Mock |
| | | private ReplicationDomainDB domainDB; |
| | | private Map<Pair<DN, Integer>, SequentialDBCursor> cursors; |
| | | |
| | | private List<DN> eclEnabledDomains; |
| | | private MultiDomainDBCursor multiDomainCursor; |
| | | private Map<Pair<DN, Integer>, SequentialDBCursor> replicaDBCursors; |
| | | private Map<DN, DomainDBCursor> domainDBCursors; |
| | | private ChangelogState initialState; |
| | | private Map<DN, ServerState> domainNewestCSNs; |
| | | private ChangeNumberIndexer cnIndexer; |
| | |
| | | public void setup() throws Exception |
| | | { |
| | | MockitoAnnotations.initMocks(this); |
| | | when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB); |
| | | when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB); |
| | | |
| | | multiDomainCursor = new MultiDomainDBCursor(domainDB); |
| | | initialState = new ChangelogState(); |
| | | initialCookie = new MultiDomainServerState(); |
| | | cursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>(); |
| | | replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>(); |
| | | domainDBCursors = new HashMap<DN, DomainDBCursor>(); |
| | | domainNewestCSNs = new HashMap<DN, ServerState>(); |
| | | |
| | | when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB); |
| | | when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB); |
| | | when(domainDB.getCursorFrom(any(MultiDomainServerState.class))).thenReturn( |
| | | multiDomainCursor); |
| | | } |
| | | |
| | | @AfterMethod |
| | |
| | | @Test |
| | | public void emptyDBNoDS() throws Exception |
| | | { |
| | | startCNIndexer(BASE_DN1); |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBOneDS() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void nonEmptyDBOneDS() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | setCNIndexDBInitialRecords(msg1); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSs() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | // simulate messages received out of order |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsDifferentDomains() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1, BASE_DN2); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN2, serverId2); |
| | | startCNIndexer(BASE_DN1, BASE_DN2); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void nonEmptyDBTwoDSs() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | setCNIndexDBInitialRecords(msg1, msg2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(ADMIN_DATA_DN, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | addReplica(BASE_DN1, serverId3); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | // cn=admin data will does not participate in the external changelog |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBOneInitialDSAnotherDSJoining() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneGoingOffline() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneInitiallyOffline() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1)); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | publishUpdateMsg(msg1); |
| | | initialState.addOfflineReplica(BASE_DN1, new CSN(2, 1, serverId1)); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | |
| | | // blocked until we receive info for serverId2 |
| | | assertExternalChangelogContent(); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1)); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2); |
| | | final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3); |
| | | publishUpdateMsg(msg2, msg3); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | // MCP moves forward because serverId1 is not really offline |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneKilled() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | |
| | | private void addReplica(DN baseDN, int serverId) throws Exception |
| | | { |
| | | final SequentialDBCursor cursor = new SequentialDBCursor(); |
| | | cursors.put(Pair.of(baseDN, serverId), cursor); |
| | | when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class))) |
| | | .thenReturn(cursor); |
| | | final SequentialDBCursor replicaDBCursor = new SequentialDBCursor(); |
| | | replicaDBCursors.put(Pair.of(baseDN, serverId), replicaDBCursor); |
| | | |
| | | if (isECLEnabledDomain2(baseDN)) |
| | | { |
| | | DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN); |
| | | if (domainDBCursor == null) |
| | | { |
| | | domainDBCursor = new DomainDBCursor(baseDN, domainDB); |
| | | domainDBCursors.put(baseDN, domainDBCursor); |
| | | |
| | | multiDomainCursor.addDomain(baseDN, null); |
| | | when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class))) |
| | | .thenReturn(domainDBCursor); |
| | | } |
| | | domainDBCursor.addReplicaDB(serverId, null); |
| | | when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class))) |
| | | .thenReturn(replicaDBCursor); |
| | | } |
| | | |
| | | when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn( |
| | | getDomainNewestCSNs(baseDN)); |
| | | initialState.addServerIdToDomain(serverId, baseDN); |
| | |
| | | return serverState; |
| | | } |
| | | |
| | | private void startCNIndexer(DN... eclEnabledDomains) |
| | | private void startCNIndexer() |
| | | { |
| | | final List<DN> eclEnabledDomainList = Arrays.asList(eclEnabledDomains); |
| | | cnIndexer = new ChangeNumberIndexer(changelogDB, initialState) |
| | | { |
| | | @Override |
| | | protected boolean isECLEnabledDomain(DN baseDN) |
| | | { |
| | | return eclEnabledDomainList.contains(baseDN); |
| | | return isECLEnabledDomain2(baseDN); |
| | | } |
| | | |
| | | }; |
| | | cnIndexer.start(); |
| | | waitForWaitingState(cnIndexer); |
| | | } |
| | | |
| | | private boolean isECLEnabledDomain2(DN baseDN) |
| | | { |
| | | return eclEnabledDomains.contains(baseDN); |
| | | } |
| | | |
| | | private void stopCNIndexer() throws Exception |
| | | { |
| | | if (cnIndexer != null) |
| | |
| | | final CSN csn = newestMsg.getCSN(); |
| | | when(cnIndexDB.getNewestRecord()).thenReturn( |
| | | new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn)); |
| | | final SequentialDBCursor cursor = cursors.get(Pair.of(baseDN, csn.getServerId())); |
| | | final SequentialDBCursor cursor = |
| | | replicaDBCursors.get(Pair.of(baseDN, csn.getServerId())); |
| | | cursor.add(newestMsg); |
| | | } |
| | | initialCookie.update(msg.getBaseDN(), msg.getCSN()); |
| | |
| | | for (ReplicatedUpdateMsg msg : msgs) |
| | | { |
| | | final SequentialDBCursor cursor = |
| | | cursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId())); |
| | | replicaDBCursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId())); |
| | | if (msg.isEmptyCursor()) |
| | | { |
| | | cursor.add(null); |
| | |
| | | }; |
| | | } |
| | | |
| | | @Test(dataProvider = "precedingCSNDataProvider") |
| | | public void getPrecedingCSN(CSN start, CSN expected) |
| | | { |
| | | cnIndexer = new ChangeNumberIndexer(changelogDB, initialState); |
| | | CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start); |
| | | assertThat(precedingCSN).isEqualTo(expected); |
| | | } |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | import java.util.Collections; |
| | | import java.util.Iterator; |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | |
| | | public class CompositeDBCursorTest extends DirectoryServerTestCase |
| | | { |
| | | |
| | | private final class ConcreteCompositeDBCursor extends CompositeDBCursor<String> |
| | | { |
| | | @Override |
| | | protected void incorporateNewCursors() throws ChangelogException |
| | | { |
| | | } |
| | | |
| | | @Override |
| | | protected Iterator<String> removedCursorsIterator() |
| | | { |
| | | return Collections.EMPTY_LIST.iterator(); |
| | | } |
| | | } |
| | | |
| | | private UpdateMsg msg1; |
| | | private UpdateMsg msg2; |
| | | private UpdateMsg msg3; |
| | |
| | | of(msg4, baseDN1)); |
| | | } |
| | | |
| | | // TODO : this test fails because msg2 is returned twice |
| | | @Test(enabled=false) |
| | | public void recycleTwoElementsCursorsLongerExhaustion() throws Exception |
| | | { |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |
| | |
| | | private CompositeDBCursor<String> newCompositeDBCursor( |
| | | Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception |
| | | { |
| | | final Map<DBCursor<UpdateMsg>, String> cursorsMap = |
| | | new HashMap<DBCursor<UpdateMsg>, String>(); |
| | | final CompositeDBCursor<String> cursor = new ConcreteCompositeDBCursor(); |
| | | for (Pair<? extends DBCursor<UpdateMsg>, String> pair : pairs) |
| | | { |
| | | // The cursors in the composite are expected to be pointing |
| | | // to first record available |
| | | pair.getFirst().next(); |
| | | cursorsMap.put(pair.getFirst(), pair.getSecond()); |
| | | cursor.addCursor(pair.getFirst(), pair.getSecond()); |
| | | } |
| | | return new CompositeDBCursor<String>(cursorsMap, true); |
| | | return cursor; |
| | | } |
| | | |
| | | private void assertInOrder(final CompositeDBCursor<String> compCursor, |