OPENDJ-1441 (CR-4037) Persistent searches on external changelog do not return changes for new replicas and new domains
This commit only fixes the "new replicas" case.
For this, I made the existing CompositeDBCursor abstract and implemented 2 new cursors: DomainDBCursor and MultiDomainDBCursor which iterate on a single replication domain or in a cross domain fashion.
These new cursors are able to react to topology changes like a new domain or a new replica.
However, ECL persistent search cannot take advantage of them right now because their list of ECLServerHandler.domainCtxts is initialized at the start of the search and never updated after.
Please note that I added new methods to ReplicationDomainDB, but several of them are just called by DomainDBCursor, MultiDomainDBCursor or ChangeNumberIndexer which are considered internal classes to the changelog DB. So maybe should we find a way to hide them from code client to the changelogDB.
These changes meant I could remove the awful double way to use the CompositeDBCursor + I could also remove most cursor management from ChangeNumberIndexer. I think I might find a way in a subsequent commit to also get rid of ChangeNumberIndexer.replicasOffline.
Alas the same changes are duplicated in JE + file based changelog.
DomainDBCursor.java, MultiDomainDBCursor.java: ADDED
ReplicationDomainDB.java:
Added getCursorFrom(MultiDomainServerState startAfterState) and unregisterCursor(DBCursor) called by DomainDBCursor and MultiDomainDBCursor.
FileChangelogDB.java, JEChangelogDB.java:
Added registeredDomainCursors and registeredMultiDomainCursors fields.
In getExistingOrNewDomainMap(), updated MultiDomainDBCursors when a new ECL enabled domain is created.
In getCursorFrom(DN baseDN, int serverId, CSN), created the ReplicaOfflineCSN there from getCursorFrom(DN baseDN, ServerState).
Added newDomainDBCursor().
Reworked newOfflineCSN().
Implemented new methods in ReplicationDomainDB.
In getOrCreateReplicaDB(), updated DomainDBCursors when a new replica is created.
Synchronized the two files to ease diffing them together.
ChangeNumberIndexer.java:
Removed the responsibility to manage cursors from this class.
Removed allCursors, newCursors fields.
In publishUpdateMsg(), initialize(), moveForwardMediumConsistencyPoint() and run() removed code that dealt with creating/opening/recreating/removing cursors.
In initialize(), ignored ReplicaOfflineMsgs.
In run(), better handled the removed domains + ignored ReplicaOfflineMsgs.
Removed resetNextChangeForInsertDBCursor(), ensureCursorExists(), removeCursors(), getCursor(), recycleExhaustedCursors(), createNewCursors(), getPrecedingCSN().
Made getPrecedingCSN() public static.
Added logUnexpectedException().
CompositeDBCursor.java:
Now abstract.
Removed ctor.
Removed recycleExhaustedCursors field.
Added incorporateNewCursors, removedCursorsIterator() and addCursor() + used them in next().
In next(), extracted recycleExhaustedCursors() + used newly added removeNoLongerNeededCursors().
In close(), completed code.
ChangeNumberIndexerTest.java:
Consequence of the changes to ChangeNumberIndexer.
Renamed cursors field to replicaDBCursors.
Added multiDomainCursor, domainDBCursors fields.
Added eclEnabledDomains field to separate it from startCNIndexer().
Changed setup() and addReplica().
CompositeDBCursorTest.java:
Consequence of the change to CompositeDBCursor.
ExternalChangeLogTest.java:
Code cleanup.
Extracted method readMessages() to factorize code.
Added method assertLastCookieDifferentThanLastValue() to loop until last cookie is updated.
Added inner class Results.
2 files added
7 files modified
| | |
| | | package org.opends.server.replication.server.changelog.api; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor; |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | |
| | | */ |
| | | void removeDomain(DN baseDN) throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} across all the domains starting after the |
| | | * provided {@link MultiDomainServerState} for each domain. |
| | | * <p> |
| | | * When the cursor is not used anymore, client code MUST call the |
| | | * {@link DBCursor#close()} method to free the resources and locks used by the |
| | | * cursor. |
| | | * |
| | | * @param startAfterState |
| | | * Starting point for each domain cursor. If any {@link ServerState} |
| | | * for a domain is null, then start from the oldest CSN for each |
| | | * replicaDBs |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see #getCursorFrom(DN, ServerState) |
| | | */ |
| | | public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startAfterState) |
| | | throws ChangelogException; |
| | | |
| | | // serverId methods |
| | | |
| | | /** |
| | |
| | | * |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @param startAfterServerState |
| | | * @param startAfterState |
| | | * Starting point for each ReplicaDB cursor. If any CSN for a |
| | | * replicaDB is null, then start from the oldest CSN for this |
| | | * replicaDB |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see #getCursorFrom(DN, int, CSN) |
| | | */ |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, |
| | | ServerState startAfterServerState) throws ChangelogException; |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterState) |
| | | throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} for one replicaDB for the specified |
| | |
| | | throws ChangelogException; |
| | | |
| | | /** |
| | | * Unregisters the provided cursor from this replication domain. |
| | | * |
| | | * @param cursor |
| | | * the cursor to unregister. |
| | | */ |
| | | void unregisterCursor(DBCursor<?> cursor); |
| | | |
| | | /** |
| | | * Publishes the provided change to the changelog DB for the specified |
| | | * serverId and replication domain. After a change has been successfully |
| | | * published, it becomes available to be returned by the External ChangeLog. |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.Collections; |
| | | import java.util.Comparator; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.concurrent.ConcurrentSkipListSet; |
| | | |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.ReplicaOfflineMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | |
| | | private ChangelogState changelogState; |
| | | |
| | | /* |
| | | * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because |
| | | * The following MultiDomainServerState fields must be thread safe, because |
| | | * 1) initialization can happen while the replication server starts receiving |
| | | * updates 2) many updates can happen concurrently. |
| | | */ |
| | |
| | | * |
| | | * @NonNull |
| | | */ |
| | | @SuppressWarnings("unchecked") |
| | | private CompositeDBCursor<DN> nextChangeForInsertDBCursor = |
| | | new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false); |
| | | |
| | | /** |
| | | * New cursors for this Map must be created from the {@link #run()} method, |
| | | * i.e. from the same thread that will make use of them. If this rule is not |
| | | * obeyed, then a JE exception will be thrown about |
| | | * "Non-transactional Cursors may not be used in multiple threads;". |
| | | */ |
| | | private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors = |
| | | new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>(); |
| | | /** |
| | | * Holds the newCursors that will have to be created in the next iteration |
| | | * inside the {@link #run()} method. |
| | | * <p> |
| | | * This map can be updated by multiple threads. |
| | | */ |
| | | private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors = |
| | | new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>( |
| | | new Comparator<Pair<DN, Integer>>() |
| | | { |
| | | @Override |
| | | public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2) |
| | | { |
| | | final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst()); |
| | | if (compareBaseDN == 0) |
| | | { |
| | | return o1.getSecond().compareTo(o2.getSecond()); |
| | | } |
| | | return compareBaseDN; |
| | | } |
| | | }); |
| | | private MultiDomainDBCursor nextChangeForInsertDBCursor; |
| | | |
| | | /** |
| | | * Builds a ChangeNumberIndexer object. |
| | |
| | | return; |
| | | } |
| | | |
| | | final CSN csn = updateMsg.getCSN(); |
| | | // only keep the oldest CSN that will be the new cursor's starting point |
| | | newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn); |
| | | final CSN oldestCSNBefore = getOldestLastAliveCSN(); |
| | | lastAliveCSNs.update(baseDN, csn); |
| | | lastAliveCSNs.update(baseDN, updateMsg.getCSN()); |
| | | tryNotify(oldestCSNBefore); |
| | | } |
| | | |
| | |
| | | for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | { |
| | | final DN baseDN = entry.getKey(); |
| | | if (!isECLEnabledDomain(baseDN)) |
| | | if (isECLEnabledDomain(baseDN)) |
| | | { |
| | | continue; |
| | | } |
| | | for (Integer serverId : entry.getValue()) |
| | | { |
| | | /* |
| | | * initialize with the oldest possible CSN in order for medium |
| | | * consistency to wait for all replicas to be alive before moving forward |
| | | */ |
| | | lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId)); |
| | | } |
| | | |
| | | for (Integer serverId : entry.getValue()) |
| | | { |
| | | /* |
| | | * initialize with the oldest possible CSN in order for medium |
| | | * consistency to wait for all replicas to be alive before moving |
| | | * forward |
| | | */ |
| | | lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId)); |
| | | // start after the actual CSN when initializing from the previous cookie |
| | | final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId); |
| | | ensureCursorExists(baseDN, serverId, csn); |
| | | final ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); |
| | | lastAliveCSNs.update(baseDN, latestKnownState); |
| | | } |
| | | |
| | | ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); |
| | | lastAliveCSNs.update(baseDN, latestKnownState); |
| | | } |
| | | resetNextChangeForInsertDBCursor(); |
| | | |
| | | nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV); |
| | | nextChangeForInsertDBCursor.next(); |
| | | |
| | | if (newestRecord != null) |
| | | { |
| | | // restore the "previousCookie" state before shutdown |
| | | final UpdateMsg record = nextChangeForInsertDBCursor.getRecord(); |
| | | UpdateMsg record = nextChangeForInsertDBCursor.getRecord(); |
| | | if (record instanceof ReplicaOfflineMsg) |
| | | { |
| | | // ignore: replica offline messages are never stored in the CNIndexDB |
| | | nextChangeForInsertDBCursor.next(); |
| | | record = nextChangeForInsertDBCursor.getRecord(); |
| | | } |
| | | |
| | | // sanity check: ensure that when initializing the cursors at the previous |
| | | // cookie, the next change we find is the newest record in the CNIndexDB |
| | | if (!record.getCSN().equals(newestRecord.getCSN())) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get(newestRecord |
| | | .getCSN().toStringUI(), record.getCSN().toStringUI())); |
| | | throw new ChangelogException(ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get( |
| | | newestRecord.getCSN().toStringUI(), record.getCSN().toStringUI())); |
| | | } |
| | | // Now we can update the mediumConsistencyRUV |
| | | mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN()); |
| | |
| | | return new CSN(0, 0, serverId); |
| | | } |
| | | |
| | | private void resetNextChangeForInsertDBCursor() throws ChangelogException |
| | | { |
| | | final Map<DBCursor<UpdateMsg>, DN> cursors = |
| | | new HashMap<DBCursor<UpdateMsg>, DN>(); |
| | | for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry |
| | | : this.allCursors.entrySet()) |
| | | { |
| | | for (Entry<Integer, DBCursor<UpdateMsg>> entry2 |
| | | : entry.getValue().entrySet()) |
| | | { |
| | | cursors.put(entry2.getValue(), entry.getKey()); |
| | | } |
| | | } |
| | | |
| | | // CNIndexer manages the cursor itself, |
| | | // so do not try to recycle exhausted cursors |
| | | CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false); |
| | | result.next(); |
| | | nextChangeForInsertDBCursor = result; |
| | | } |
| | | |
| | | private boolean ensureCursorExists(DN baseDN, Integer serverId, |
| | | CSN startAfterCSN) throws ChangelogException |
| | | { |
| | | Map<Integer, DBCursor<UpdateMsg>> map = allCursors.get(baseDN); |
| | | if (map == null) |
| | | { |
| | | map = new ConcurrentSkipListMap<Integer, DBCursor<UpdateMsg>>(); |
| | | allCursors.put(baseDN, map); |
| | | } |
| | | DBCursor<UpdateMsg> cursor = map.get(serverId); |
| | | if (cursor == null) |
| | | { |
| | | final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); |
| | | cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN); |
| | | cursor.next(); |
| | | map.put(serverId, cursor); |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Returns the immediately preceding CSN. |
| | | * |
| | | * @param csn |
| | | * the CSN to use |
| | | * @return the immediately preceding CSN or null if the provided CSN is null. |
| | | */ |
| | | CSN getPrecedingCSN(CSN csn) |
| | | { |
| | | if (csn == null) |
| | | { |
| | | return null; |
| | | } |
| | | if (csn.getSeqnum() > 0) |
| | | { |
| | | return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId()); |
| | | } |
| | | return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId()); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void initiateShutdown() |
| | |
| | | { |
| | | /* |
| | | * initialize here to allow fast application start up and avoid errors due |
| | | * cursors being created in a different thread to the one where they are |
| | | * used. |
| | | * cursors being created in a different thread to the one where they are used. |
| | | */ |
| | | initialize(); |
| | | |
| | |
| | | { |
| | | if (!domainsToClear.isEmpty()) |
| | | { |
| | | final DN cursorData = nextChangeForInsertDBCursor.getData(); |
| | | final boolean callNextOnCursor = |
| | | cursorData == null || domainsToClear.contains(cursorData); |
| | | while (!domainsToClear.isEmpty()) |
| | | { |
| | | final DN baseDNToClear = domainsToClear.first(); |
| | | removeCursors(baseDNToClear); |
| | | nextChangeForInsertDBCursor.removeDomain(baseDNToClear); |
| | | // Only release the waiting thread |
| | | // once this domain's state has been cleared. |
| | | domainsToClear.remove(baseDNToClear); |
| | | } |
| | | resetNextChangeForInsertDBCursor(); |
| | | } |
| | | else |
| | | { |
| | | final boolean createdCursors = createNewCursors(); |
| | | final boolean recycledCursors = recycleExhaustedCursors(); |
| | | if (createdCursors || recycledCursors) |
| | | |
| | | if (callNextOnCursor) |
| | | { |
| | | resetNextChangeForInsertDBCursor(); |
| | | // The next change to consume comes from a domain to be removed. |
| | | // Call DBCursor.next() to ensure this domain is removed |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | } |
| | | |
| | | // Do not call DBCursor.next() here |
| | | // because we might not have consumed the last record, |
| | | // for example if we could not move the MCP forward |
| | | final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord(); |
| | | if (msg == null) |
| | | { |
| | |
| | | } |
| | | wait(); |
| | | } |
| | | // loop to check whether new changes have been added to the |
| | | // ReplicaDBs |
| | | // check whether new changes have been added to the ReplicaDBs |
| | | nextChangeForInsertDBCursor.next(); |
| | | continue; |
| | | } |
| | | else if (msg instanceof ReplicaOfflineMsg) |
| | | { |
| | | nextChangeForInsertDBCursor.next(); |
| | | continue; |
| | | } |
| | | |
| | |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | | // Nothing can be done about it. |
| | | // Rely on the DirectoryThread uncaught exceptions handler |
| | | // for logging error + alert. |
| | | // LocalizableMessage logged here gives corrective information to the administrator. |
| | | logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION, |
| | | getClass().getSimpleName(), stackTraceToSingleLineString(e)); |
| | | logUnexpectedException(e); |
| | | // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert. |
| | | throw e; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // Nothing can be done about it. |
| | | // Rely on the DirectoryThread uncaught exceptions handler |
| | | // for logging error + alert. |
| | | // LocalizableMessage logged here gives corrective information to the administrator. |
| | | logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION, |
| | | getClass().getSimpleName(), stackTraceToSingleLineString(e)); |
| | | logUnexpectedException(e); |
| | | // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert. |
| | | throw new RuntimeException(e); |
| | | } |
| | | finally |
| | | { |
| | | removeCursors(DN.NULL_DN); |
| | | nextChangeForInsertDBCursor.close(); |
| | | nextChangeForInsertDBCursor = null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Nothing can be done about it. |
| | | * <p> |
| | | * Rely on the DirectoryThread uncaught exceptions handler for logging error + |
| | | * alert. |
| | | * <p> |
| | | * Message logged here gives corrective information to the administrator. |
| | | */ |
| | | private void logUnexpectedException(Exception e) |
| | | { |
| | | logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION, |
| | | getClass().getSimpleName(), stackTraceToSingleLineString(e)); |
| | | } |
| | | |
| | | private void moveForwardMediumConsistencyPoint(final CSN mcCSN, |
| | | final DN mcBaseDN) throws ChangelogException |
| | | { |
| | | // update, so it becomes the previous cookie for the next change |
| | | mediumConsistencyRUV.update(mcBaseDN, mcCSN); |
| | | |
| | | boolean callNextOnCursor = true; |
| | | final int mcServerId = mcCSN.getServerId(); |
| | | final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId); |
| | | final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId); |
| | |
| | | } |
| | | else if (offlineCSN.isOlderThan(mcCSN)) |
| | | { |
| | | Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>> |
| | | pair = getCursor(mcBaseDN, mcCSN.getServerId()); |
| | | Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond(); |
| | | if (iter != null && !iter.hasNext()) |
| | | { |
| | | /* |
| | | * replica is not back online, Medium consistency point has gone past |
| | | * its last offline time, and there are no more changes after the |
| | | * offline CSN in the cursor: remove everything known about it: |
| | | * cursor, offlineCSN from lastAliveCSN and remove all knowledge of |
| | | * this replica from the medium consistency RUV. |
| | | */ |
| | | iter.remove(); |
| | | StaticUtils.close(pair.getFirst()); |
| | | resetNextChangeForInsertDBCursor(); |
| | | callNextOnCursor = false; |
| | | lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); |
| | | } |
| | | /* |
| | | * replica is not back online, Medium consistency point has gone past |
| | | * its last offline time, and there are no more changes after the |
| | | * offline CSN in the cursor: remove everything known about it: |
| | | * cursor, offlineCSN from lastAliveCSN and remove all knowledge of |
| | | * this replica from the medium consistency RUV. |
| | | */ |
| | | // TODO JNR how to close cursor for offline replica? |
| | | lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); |
| | | } |
| | | } |
| | | |
| | | if (callNextOnCursor) |
| | | { |
| | | // advance the cursor we just read from, |
| | | // success/failure will be checked later |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | } |
| | | |
| | | private void removeCursors(DN baseDN) |
| | | { |
| | | if (nextChangeForInsertDBCursor != null) |
| | | { |
| | | nextChangeForInsertDBCursor.close(); |
| | | nextChangeForInsertDBCursor = null; |
| | | } |
| | | if (DN.NULL_DN.equals(baseDN)) |
| | | { |
| | | // close all cursors |
| | | for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values()) |
| | | { |
| | | StaticUtils.close(map.values()); |
| | | } |
| | | allCursors.clear(); |
| | | newCursors.clear(); |
| | | } |
| | | else |
| | | { |
| | | // close cursors for this DN |
| | | final Map<Integer, DBCursor<UpdateMsg>> map = allCursors.remove(baseDN); |
| | | if (map != null) |
| | | { |
| | | StaticUtils.close(map.values()); |
| | | } |
| | | for (Iterator<Pair<DN, Integer>> it = newCursors.keySet().iterator(); it.hasNext();) |
| | | { |
| | | if (it.next().getFirst().equals(baseDN)) |
| | | { |
| | | it.remove(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>> |
| | | getCursor(final DN baseDN, final int serverId) throws ChangelogException |
| | | { |
| | | for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1 |
| | | : allCursors.entrySet()) |
| | | { |
| | | if (baseDN.equals(entry1.getKey())) |
| | | { |
| | | for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = |
| | | entry1.getValue().entrySet().iterator(); iter.hasNext();) |
| | | { |
| | | final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next(); |
| | | if (serverId == entry2.getKey()) |
| | | { |
| | | return Pair.of(entry2.getValue(), iter); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | return Pair.empty(); |
| | | } |
| | | |
| | | private boolean recycleExhaustedCursors() throws ChangelogException |
| | | { |
| | | boolean succesfullyRecycled = false; |
| | | for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values()) |
| | | { |
| | | for (DBCursor<UpdateMsg> cursor : map.values()) |
| | | { |
| | | // try to recycle it by calling next() |
| | | if (cursor.getRecord() == null && cursor.next()) |
| | | { |
| | | succesfullyRecycled = true; |
| | | } |
| | | } |
| | | } |
| | | return succesfullyRecycled; |
| | | } |
| | | |
| | | private boolean createNewCursors() throws ChangelogException |
| | | { |
| | | if (!newCursors.isEmpty()) |
| | | { |
| | | boolean newCursorAdded = false; |
| | | for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter = |
| | | newCursors.entrySet().iterator(); iter.hasNext();) |
| | | { |
| | | final Entry<Pair<DN, Integer>, CSN> entry = iter.next(); |
| | | final DN baseDN = entry.getKey().getFirst(); |
| | | final CSN csn = entry.getValue(); |
| | | // start after preceding CSN so the first CSN read will exactly be the |
| | | // current one |
| | | final CSN startFromCSN = getPrecedingCSN(csn); |
| | | if (!ensureCursorExists(baseDN, csn.getServerId(), startFromCSN)) |
| | | { |
| | | newCursorAdded = true; |
| | | } |
| | | iter.remove(); |
| | | } |
| | | return newCursorAdded; |
| | | } |
| | | return false; |
| | | // advance the cursor we just read from, |
| | | // success/failure will be checked later |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @param <Data> |
| | | * The type of data associated with each cursor |
| | | */ |
| | | final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg> |
| | | abstract class CompositeDBCursor<Data> implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | private static final byte UNINITIALIZED = 0; |
| | |
| | | */ |
| | | private byte state = UNINITIALIZED; |
| | | |
| | | /** Whether this composite should try to recycle exhausted cursors. */ |
| | | private final boolean recycleExhaustedCursors; |
| | | /** |
| | | * These cursors are considered exhausted because they had no new changes the |
| | | * last time {@link DBCursor#next()} was called on them. Exhausted cursors |
| | |
| | | /** |
| | | * The cursors are sorted based on the current change of each cursor to |
| | | * consider the next change across all available cursors. |
| | | * <p> |
| | | * New cursors for this Map must be created from the same thread that will |
| | | * make use of them. When this rule is not obeyed, a JE exception will be |
| | | * thrown about |
| | | * "Non-transactional Cursors may not be used in multiple threads;". |
| | | */ |
| | | private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors = |
| | | private final TreeMap<DBCursor<UpdateMsg>, Data> cursors = |
| | | new TreeMap<DBCursor<UpdateMsg>, Data>( |
| | | new Comparator<DBCursor<UpdateMsg>>() |
| | | { |
| | |
| | | } |
| | | }); |
| | | |
| | | /** |
| | | * Builds a CompositeDBCursor using the provided collection of cursors. |
| | | * |
| | | * @param cursors |
| | | * the cursors that will be iterated upon. |
| | | * @param recycleExhaustedCursors |
| | | * whether a call to {@link #next()} tries to recycle exhausted |
| | | * cursors |
| | | */ |
| | | public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors, |
| | | boolean recycleExhaustedCursors) |
| | | { |
| | | this.recycleExhaustedCursors = recycleExhaustedCursors; |
| | | for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet()) |
| | | { |
| | | put(entry); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | |
| | | { |
| | | return false; |
| | | } |
| | | final boolean advanceNonExhaustedCursors = state != UNINITIALIZED; |
| | | |
| | | // If previous state was ready, then we must advance the first cursor |
| | | // (which UpdateMsg has been consumed). |
| | | // To keep consistent the cursors' order in the SortedSet, it is necessary |
| | | // to remove the first cursor, then add it again after moving it forward. |
| | | final Entry<DBCursor<UpdateMsg>, Data> cursorToAdvance = |
| | | state != UNINITIALIZED ? cursors.pollFirstEntry() : null; |
| | | state = READY; |
| | | if (recycleExhaustedCursors && !exhaustedCursors.isEmpty()) |
| | | recycleExhaustedCursors(); |
| | | if (cursorToAdvance != null) |
| | | { |
| | | // try to recycle empty cursors in case the underlying ReplicaDBs received |
| | | // new changes. |
| | | addCursor(cursorToAdvance.getKey(), cursorToAdvance.getValue()); |
| | | } |
| | | |
| | | removeNoLongerNeededCursors(); |
| | | incorporateNewCursors(); |
| | | return !cursors.isEmpty(); |
| | | } |
| | | |
| | | private void recycleExhaustedCursors() throws ChangelogException |
| | | { |
| | | if (!exhaustedCursors.isEmpty()) |
| | | { |
| | | // try to recycle exhausted cursors in case the underlying replica DBs received new changes. |
| | | final Map<DBCursor<UpdateMsg>, Data> copy = |
| | | new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors); |
| | | exhaustedCursors.clear(); |
| | | for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet()) |
| | | { |
| | | entry.getKey().next(); |
| | | put(entry); |
| | | } |
| | | final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry(); |
| | | if (firstEntry != null && copy.containsKey(firstEntry.getKey())) |
| | | { |
| | | // if the first cursor was previously an exhausted cursor, |
| | | // then we have already called next() on it. |
| | | // Avoid calling it again because we know new changes have been found. |
| | | return true; |
| | | addCursor(entry.getKey(), entry.getValue()); |
| | | } |
| | | } |
| | | |
| | | // To keep consistent the cursors' order in the SortedSet, it is necessary |
| | | // to remove and add again the cursor after moving it forward. |
| | | if (advanceNonExhaustedCursors) |
| | | { |
| | | Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry(); |
| | | if (firstEntry != null) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = firstEntry.getKey(); |
| | | cursor.next(); |
| | | put(firstEntry); |
| | | } |
| | | } |
| | | // no cursors are left with changes. |
| | | return !cursors.isEmpty(); |
| | | } |
| | | |
| | | private void put(Entry<DBCursor<UpdateMsg>, Data> entry) |
| | | private void removeNoLongerNeededCursors() |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = entry.getKey(); |
| | | final Data data = entry.getValue(); |
| | | if (cursor.getRecord() != null) |
| | | for (final Iterator<Data> iter = removedCursorsIterator(); iter.hasNext();) |
| | | { |
| | | final Data dataToFind = iter.next(); |
| | | for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter = |
| | | cursors.entrySet().iterator(); cursorIter.hasNext();) |
| | | { |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next(); |
| | | if (dataToFind.equals(entry.getValue())) |
| | | { |
| | | entry.getKey().close(); |
| | | cursorIter.remove(); |
| | | } |
| | | } |
| | | iter.remove(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns an Iterator over the data associated to cursors that must be removed. |
| | | * |
| | | * @return an Iterator over the data associated to cursors that must be removed. |
| | | */ |
| | | protected abstract Iterator<Data> removedCursorsIterator(); |
| | | |
| | | /** |
| | | * Adds a cursor to this composite cursor. It first calls |
| | | * {@link DBCursor#next()} to verify whether it is exhausted or not. |
| | | * |
| | | * @param cursor |
| | | * the cursor to add to this composite |
| | | * @param data |
| | | * the data associated to the provided cursor |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | protected void addCursor(final DBCursor<UpdateMsg> cursor, final Data data) throws ChangelogException |
| | | { |
| | | if (cursor.next()) |
| | | { |
| | | this.cursors.put(cursor, data); |
| | | } |
| | |
| | | @Override |
| | | public UpdateMsg getRecord() |
| | | { |
| | | // Cannot call incorporateNewCursors() here because |
| | | // somebody might have already called DBCursor.getRecord() and read the record |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry(); |
| | | if (entry != null) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Called when implementors should incorporate new cursors into the current |
| | | * composite DBCursor. Implementors should call |
| | | * {@link #addCursor(DBCursor, Object)} to do so. |
| | | * |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | protected abstract void incorporateNewCursors() throws ChangelogException; |
| | | |
| | | /** |
| | | * Returns the data associated to the cursor that returned the current record. |
| | | * |
| | | * @return the data associated to the cursor that returned the current record. |
| | |
| | | @Override |
| | | public void close() |
| | | { |
| | | state = CLOSED; |
| | | StaticUtils.close(cursors.keySet()); |
| | | StaticUtils.close(exhaustedCursors.keySet()); |
| | | cursors.clear(); |
| | | exhaustedCursors.clear(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.Collections; |
| | | import java.util.Iterator; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | | * Cursor iterating over a replication domain's replica DBs. |
| | | */ |
| | | public class DomainDBCursor extends CompositeDBCursor<Void> |
| | | { |
| | | |
| | | private final DN baseDN; |
| | | private final ReplicationDomainDB domainDB; |
| | | |
| | | private final ConcurrentSkipListMap<Integer, CSN> newReplicas = |
| | | new ConcurrentSkipListMap<Integer, CSN>(); |
| | | /** |
| | | * Replaces null CSNs in ConcurrentSkipListMap that does not support null values. |
| | | */ |
| | | private static final CSN NULL_CSN = new CSN(0, 0, 0); |
| | | |
| | | /** |
| | | * Builds a DomainDBCursor instance. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain baseDN of this cursor |
| | | * @param domainDB |
| | | * the DB for the provided replication domain |
| | | */ |
| | | public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB) |
| | | { |
| | | this.baseDN = baseDN; |
| | | this.domainDB = domainDB; |
| | | } |
| | | |
| | | /** |
| | | * Returns the replication domain baseDN of this cursor. |
| | | * |
| | | * @return the replication domain baseDN of this cursor. |
| | | */ |
| | | public DN getBaseDN() |
| | | { |
| | | return baseDN; |
| | | } |
| | | |
| | | /** |
| | | * Adds a replicaDB for this cursor to iterate over. Added cursors will be |
| | | * created and iterated over on the next call to {@link #next()}. |
| | | * |
| | | * @param serverId |
| | | * the serverId of the replica |
| | | * @param startAfterCSN |
| | | * the CSN after which to start iterating |
| | | */ |
| | | public void addReplicaDB(int serverId, CSN startAfterCSN) |
| | | { |
| | | // only keep the oldest CSN that will be the new cursor's starting point |
| | | newReplicas.putIfAbsent(serverId, startAfterCSN != null ? startAfterCSN : NULL_CSN); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | protected void incorporateNewCursors() throws ChangelogException |
| | | { |
| | | for (Iterator<Entry<Integer, CSN>> iter = newReplicas.entrySet().iterator(); iter.hasNext();) |
| | | { |
| | | final Entry<Integer, CSN> pair = iter.next(); |
| | | final int serverId = pair.getKey(); |
| | | final CSN csn = pair.getValue(); |
| | | final CSN startAfterCSN = !NULL_CSN.equals(csn) ? csn : null; |
| | | final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN); |
| | | addCursor(cursor, null); |
| | | iter.remove(); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | @SuppressWarnings("unchecked") |
| | | protected Iterator<Void> removedCursorsIterator() |
| | | { |
| | | return Collections.EMPTY_LIST.iterator(); // nothing to remove |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | super.close(); |
| | | domainDB.unregisterCursor(this); |
| | | newReplicas.clear(); |
| | | } |
| | | |
| | | } |
| | |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | |
| | | * <li>then check it's not null</li> |
| | | * <li>then close all inside</li> |
| | | * </ol> |
| | | * When creating a JEReplicaDB, synchronize on the domainMap to avoid |
| | | * When creating a replicaDB, synchronize on the domainMap to avoid |
| | | * concurrent shutdown. |
| | | */ |
| | | private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> |
| | | domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>(); |
| | | private ReplicationDbEnv dbEnv; |
| | | private ReplicationServerCfg config; |
| | | private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs = |
| | | new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>(); |
| | | /** |
| | | * \@GuardedBy("itself") |
| | | */ |
| | | private final Map<DN, List<DomainDBCursor>> registeredDomainCursors = |
| | | new HashMap<DN, List<DomainDBCursor>>(); |
| | | private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = |
| | | new CopyOnWriteArrayList<MultiDomainDBCursor>(); |
| | | private ReplicationDbEnv replicationEnv; |
| | | private final ReplicationServerCfg config; |
| | | private final File dbDirectory; |
| | | |
| | | /** |
| | |
| | | |
| | | /** The local replication server. */ |
| | | private final ReplicationServer replicationServer; |
| | | private AtomicBoolean shutdown = new AtomicBoolean(); |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(); |
| | | |
| | | private static final DBCursor<UpdateMsg> EMPTY_CURSOR = |
| | | private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB = |
| | | new DBCursor<UpdateMsg>() |
| | | { |
| | | |
| | |
| | | }; |
| | | |
| | | /** |
| | | * Builds an instance of this class. |
| | | * Creates a new changelog DB. |
| | | * |
| | | * @param replicationServer |
| | | * the local replication server. |
| | |
| | | * @throws ConfigException |
| | | * if a problem occurs opening the supplied directory |
| | | */ |
| | | public JEChangelogDB(ReplicationServer replicationServer, |
| | | ReplicationServerCfg config) throws ConfigException |
| | | public JEChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config) |
| | | throws ConfigException |
| | | { |
| | | this.config = config; |
| | | this.replicationServer = replicationServer; |
| | | this.dbDirectory = makeDir(config.getReplicationDBDirectory()); |
| | | } |
| | | |
| | | private File makeDir(String dbDirName) throws ConfigException |
| | | private File makeDir(final String dbDirName) throws ConfigException |
| | | { |
| | | // Check that this path exists or create it. |
| | | final File dbDirectory = getFileForPath(dbDirName); |
| | |
| | | { |
| | | logger.traceException(e); |
| | | |
| | | final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(); |
| | | mb.append(e.getLocalizedMessage()); |
| | | mb.append(" "); |
| | | mb.append(dbDirectory); |
| | | throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb), e); |
| | | final LocalizableMessageBuilder mb = new LocalizableMessageBuilder( |
| | | e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory)); |
| | | throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e); |
| | | } |
| | | } |
| | | |
| | | private Map<Integer, JEReplicaDB> getDomainMap(DN baseDN) |
| | | private Map<Integer, JEReplicaDB> getDomainMap(final DN baseDN) |
| | | { |
| | | final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); |
| | | if (domainMap != null) |
| | |
| | | return Collections.emptyMap(); |
| | | } |
| | | |
| | | private JEReplicaDB getReplicaDB(DN baseDN, int serverId) |
| | | private JEReplicaDB getReplicaDB(final DN baseDN, final int serverId) |
| | | { |
| | | return getDomainMap(baseDN).get(serverId); |
| | | } |
| | | |
| | | /** |
| | | * Provision resources for the specified serverId in the specified replication |
| | | * domain. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain where to add the serverId |
| | | * @param serverId |
| | | * the server Id to add to the replication domain |
| | | * @throws ChangelogException |
| | | * If a database error happened. |
| | | */ |
| | | private void commission(DN baseDN, int serverId, ReplicationServer rs) |
| | | throws ChangelogException |
| | | { |
| | | getOrCreateReplicaDB(baseDN, serverId, rs); |
| | | } |
| | | |
| | | /** |
| | | * Returns a {@link JEReplicaDB}, possibly creating it. |
| | | * |
| | | * @param baseDN |
| | |
| | | * the serverId for which to create a ReplicaDB |
| | | * @param server |
| | | * the ReplicationServer |
| | | * @return a Pair with the JEReplicaDB and a boolean indicating whether it had |
| | | * to be created |
| | | * @return a Pair with the JEReplicaDB and a boolean indicating whether it has been created |
| | | * @throws ChangelogException |
| | | * if a problem occurred with the database |
| | | */ |
| | | Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN, |
| | | int serverId, ReplicationServer server) throws ChangelogException |
| | | Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId, |
| | | final ReplicationServer server) throws ChangelogException |
| | | { |
| | | while (!shutdown.get()) |
| | | { |
| | | final ConcurrentMap<Integer, JEReplicaDB> domainMap = |
| | | getExistingOrNewDomainMap(baseDN); |
| | | final Pair<JEReplicaDB, Boolean> result = |
| | | getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server); |
| | | final ConcurrentMap<Integer, JEReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN); |
| | | final Pair<JEReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server); |
| | | if (result != null) |
| | | { |
| | | final Boolean dbWasCreated = result.getSecond(); |
| | | if (dbWasCreated) |
| | | { // new replicaDB => update all cursors with it |
| | | final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors != null && !cursors.isEmpty()) |
| | | { |
| | | for (DomainDBCursor cursor : cursors) |
| | | { |
| | | cursor.addReplicaDB(serverId, null); |
| | | } |
| | | } |
| | | } |
| | | |
| | | return result; |
| | | } |
| | | } |
| | | throw new ChangelogException( |
| | | ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get()); |
| | | throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get()); |
| | | } |
| | | |
| | | private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap( |
| | | DN baseDN) |
| | | private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(final DN baseDN) |
| | | { |
| | | // happy path: the domainMap already exists |
| | | final ConcurrentMap<Integer, JEReplicaDB> currentValue = |
| | | domainToReplicaDBs.get(baseDN); |
| | | final ConcurrentMap<Integer, JEReplicaDB> currentValue = domainToReplicaDBs.get(baseDN); |
| | | if (currentValue != null) |
| | | { |
| | | return currentValue; |
| | |
| | | // unlucky, the domainMap does not exist: take the hit and create the |
| | | // newValue, even though the same could be done concurrently by another |
| | | // thread |
| | | final ConcurrentMap<Integer, JEReplicaDB> newValue = |
| | | new ConcurrentHashMap<Integer, JEReplicaDB>(); |
| | | final ConcurrentMap<Integer, JEReplicaDB> previousValue = |
| | | domainToReplicaDBs.putIfAbsent(baseDN, newValue); |
| | | final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<Integer, JEReplicaDB>(); |
| | | final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue); |
| | | if (previousValue != null) |
| | | { |
| | | // there was already a value associated to the key, let's use it |
| | | return previousValue; |
| | | } |
| | | |
| | | if (MultimasterReplication.isECLEnabledDomain(baseDN)) |
| | | { |
| | | // we just created a new domain => update all cursors |
| | | for (MultiDomainDBCursor cursor : registeredMultiDomainCursors) |
| | | { |
| | | cursor.addDomain(baseDN, null); |
| | | } |
| | | } |
| | | return newValue; |
| | | } |
| | | |
| | | private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB( |
| | | final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId, |
| | | DN baseDN, ReplicationServer server) throws ChangelogException |
| | | private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, JEReplicaDB> domainMap, |
| | | final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException |
| | | { |
| | | // happy path: the JEReplicaDB already exists |
| | | // happy path: the replicaDB already exists |
| | | JEReplicaDB currentValue = domainMap.get(serverId); |
| | | if (currentValue != null) |
| | | { |
| | | return Pair.of(currentValue, false); |
| | | } |
| | | |
| | | // unlucky, the JEReplicaDB does not exist: take the hit and synchronize |
| | | // unlucky, the replicaDB does not exist: take the hit and synchronize |
| | | // on the domainMap to create a new ReplicaDB |
| | | synchronized (domainMap) |
| | | { |
| | |
| | | // The domainMap could have been concurrently removed because |
| | | // 1) a shutdown was initiated or 2) an initialize was called. |
| | | // Return will allow the code to: |
| | | // 1) shutdown properly or 2) lazily recreate the JEReplicaDB |
| | | // 1) shutdown properly or 2) lazily recreate the replicaDB |
| | | return null; |
| | | } |
| | | |
| | | final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, dbEnv); |
| | | final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, replicationEnv); |
| | | domainMap.put(serverId, newDB); |
| | | return Pair.of(newDB, true); |
| | | } |
| | |
| | | try |
| | | { |
| | | final File dbDir = getFileForPath(config.getReplicationDBDirectory()); |
| | | dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer); |
| | | final ChangelogState changelogState = dbEnv.getChangelogState(); |
| | | replicationEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer); |
| | | final ChangelogState changelogState = replicationEnv.getChangelogState(); |
| | | initializeToChangelogState(changelogState); |
| | | if (config.isComputeChangeNumber()) |
| | | { |
| | |
| | | { |
| | | for (int serverId : entry.getValue()) |
| | | { |
| | | commission(entry.getKey(), serverId, replicationServer); |
| | | getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void shutdownCNIndexDB() throws ChangelogException |
| | | private void shutdownChangeNumberIndexDB() throws ChangelogException |
| | | { |
| | | synchronized (cnIndexDBLock) |
| | | { |
| | |
| | | |
| | | try |
| | | { |
| | | shutdownCNIndexDB(); |
| | | shutdownChangeNumberIndexDB(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | if (dbEnv != null) |
| | | if (replicationEnv != null) |
| | | { |
| | | // wait for shutdown of the threads holding cursors |
| | | try |
| | |
| | | // do nothing: we are already shutting down |
| | | } |
| | | |
| | | dbEnv.shutdown(); |
| | | replicationEnv.shutdown(); |
| | | } |
| | | |
| | | if (firstException != null) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Clears all content from the changelog database, but leaves its directory on |
| | | * the filesystem. |
| | | * Clears all records from the changelog (does not remove the changelog itself). |
| | | * |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * If an error occurs when clearing the changelog. |
| | | */ |
| | | public void clearDB() throws ChangelogException |
| | | { |
| | |
| | | |
| | | try |
| | | { |
| | | shutdownCNIndexDB(); |
| | | shutdownChangeNumberIndexDB(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | // 3- clear the changelogstate DB |
| | | try |
| | | { |
| | | dbEnv.clearGenerationId(baseDN); |
| | | replicationEnv.clearGenerationId(baseDN); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | { |
| | | if (computeChangeNumber) |
| | | { |
| | | startIndexer(dbEnv.getChangelogState()); |
| | | startIndexer(replicationEnv.getChangelogState()); |
| | | } |
| | | else |
| | | { |
| | |
| | | { |
| | | try |
| | | { |
| | | cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv); |
| | | cnIndexDB = new JEChangeNumberIndexDB(replicationEnv); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState) |
| | | throws ChangelogException |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException |
| | | { |
| | | final Set<Integer> serverIds = getDomainMap(baseDN).keySet(); |
| | | final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas(); |
| | | final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size()); |
| | | for (int serverId : serverIds) |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this); |
| | | registeredMultiDomainCursors.add(cursor); |
| | | for (DN baseDN : domainToReplicaDBs.keySet()) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; |
| | | final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); |
| | | replicaDBCursor.next(); |
| | | final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState); |
| | | cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null); |
| | | cursor.addDomain(baseDN, startAfterState.getServerState(baseDN)); |
| | | } |
| | | // recycle exhausted cursors, |
| | | // because client code will not manage the cursors itself |
| | | return new CompositeDBCursor<Void>(cursors, true); |
| | | return cursor; |
| | | } |
| | | |
| | | private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId, |
| | | ServerState startAfterServerState) |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState) |
| | | throws ChangelogException |
| | | { |
| | | final ServerState domainState = offlineReplicas.getServerState(baseDN); |
| | | if (domainState != null) |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN); |
| | | for (int serverId : getDomainMap(baseDN).keySet()) |
| | | { |
| | | for (CSN offlineCSN : domainState) |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null; |
| | | cursor.addReplicaDB(serverId, lastCSN); |
| | | } |
| | | return cursor; |
| | | } |
| | | |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN) |
| | | { |
| | | synchronized (registeredDomainCursors) |
| | | { |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this); |
| | | List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors == null) |
| | | { |
| | | if (serverId == offlineCSN.getServerId() |
| | | && !startAfterServerState.cover(offlineCSN)) |
| | | { |
| | | return offlineCSN; |
| | | } |
| | | cursors = new ArrayList<DomainDBCursor>(); |
| | | registeredDomainCursors.put(baseDN, cursors); |
| | | } |
| | | cursors.add(cursor); |
| | | return cursor; |
| | | } |
| | | } |
| | | |
| | | private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN) |
| | | { |
| | | final MultiDomainServerState offlineReplicas = |
| | | replicationEnv.getChangelogState().getOfflineReplicas(); |
| | | final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId); |
| | | if (offlineCSN != null |
| | | && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN))) |
| | | { |
| | | return offlineCSN; |
| | | } |
| | | return null; |
| | | } |
| | |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN) |
| | | throws ChangelogException |
| | | { |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | return replicaDB.generateCursorFrom(startAfterCSN); |
| | | final DBCursor<UpdateMsg> cursor = |
| | | replicaDB.generateCursorFrom(startAfterCSN); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN); |
| | | // TODO JNR if (offlineCSN != null) ?? |
| | | // What about replicas that suddenly become offline? |
| | | return new ReplicaOfflineCursor(cursor, offlineCSN); |
| | | } |
| | | return EMPTY_CURSOR; |
| | | return EMPTY_CURSOR_REPLICA_DB; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg) |
| | | throws ChangelogException |
| | | public void unregisterCursor(final DBCursor<?> cursor) |
| | | { |
| | | final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN, |
| | | updateMsg.getCSN().getServerId(), replicationServer); |
| | | final JEReplicaDB replicaDB = pair.getFirst(); |
| | | final boolean wasCreated = pair.getSecond(); |
| | | if (cursor instanceof MultiDomainDBCursor) |
| | | { |
| | | registeredMultiDomainCursors.remove(cursor); |
| | | } |
| | | else if (cursor instanceof DomainDBCursor) |
| | | { |
| | | final DomainDBCursor domainCursor = (DomainDBCursor) cursor; |
| | | synchronized (registeredMultiDomainCursors) |
| | | { |
| | | final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN()); |
| | | if (cursors != null) |
| | | { |
| | | cursors.remove(cursor); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException |
| | | { |
| | | final CSN csn = updateMsg.getCSN(); |
| | | final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN, |
| | | csn.getServerId(), replicationServer); |
| | | final JEReplicaDB replicaDB = pair.getFirst(); |
| | | replicaDB.add(updateMsg); |
| | | |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | indexer.publishUpdateMsg(baseDN, updateMsg); |
| | | } |
| | | return wasCreated; |
| | | return pair.getSecond(); // replica DB was created |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | @Override |
| | | public void replicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException |
| | | { |
| | | dbEnv.addOfflineReplica(baseDN, offlineCSN); |
| | | replicationEnv.addOfflineReplica(baseDN, offlineCSN); |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.Iterator; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.concurrent.ConcurrentSkipListSet; |
| | | |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | | * Cursor iterating over a all the replication domain known to the changelog DB. |
| | | */ |
| | | public class MultiDomainDBCursor extends CompositeDBCursor<DN> |
| | | { |
| | | private final ReplicationDomainDB domainDB; |
| | | |
| | | private final ConcurrentSkipListMap<DN, ServerState> newDomains = |
| | | new ConcurrentSkipListMap<DN, ServerState>(); |
| | | private final ConcurrentSkipListSet<DN> removeDomains = |
| | | new ConcurrentSkipListSet<DN>(); |
| | | |
| | | /** |
| | | * Builds a MultiDomainDBCursor instance. |
| | | * |
| | | * @param domainDB |
| | | * the replication domain management DB |
| | | */ |
| | | public MultiDomainDBCursor(ReplicationDomainDB domainDB) |
| | | { |
| | | this.domainDB = domainDB; |
| | | } |
| | | |
| | | /** |
| | | * Adds a replication domain for this cursor to iterate over. Added cursors |
| | | * will be created and iterated over on the next call to {@link #next()}. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain's baseDN |
| | | * @param startAfterState |
| | | * the {@link ServerState} after which to start iterating |
| | | */ |
| | | public void addDomain(DN baseDN, ServerState startAfterState) |
| | | { |
| | | newDomains.put(baseDN, |
| | | startAfterState != null ? startAfterState : new ServerState()); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | protected void incorporateNewCursors() throws ChangelogException |
| | | { |
| | | for (Iterator<Entry<DN, ServerState>> iter = newDomains.entrySet().iterator(); |
| | | iter.hasNext();) |
| | | { |
| | | final Entry<DN, ServerState> entry = iter.next(); |
| | | final DN baseDN = entry.getKey(); |
| | | final ServerState serverState = entry.getValue(); |
| | | final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState); |
| | | addCursor(domainDBCursor, baseDN); |
| | | iter.remove(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Removes a replication domain from this cursor and stops iterating over it. |
| | | * Removed cursors will be effectively removed on the next call to |
| | | * {@link #next()}. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain's baseDN |
| | | */ |
| | | public void removeDomain(DN baseDN) |
| | | { |
| | | removeDomains.add(baseDN); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | protected Iterator<DN> removedCursorsIterator() |
| | | { |
| | | return removeDomains.iterator(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | super.close(); |
| | | domainDB.unregisterCursor(this); |
| | | newDomains.clear(); |
| | | removeDomains.clear(); |
| | | } |
| | | |
| | | } |
| | |
| | | public class ExternalChangeLogTest extends ReplicationTestCase |
| | | { |
| | | |
| | | private static class Results |
| | | { |
| | | |
| | | public final List<SearchResultEntryProtocolOp> searchResultEntries = |
| | | new ArrayList<SearchResultEntryProtocolOp>(); |
| | | public long searchReferences; |
| | | public long searchesDone; |
| | | |
| | | } |
| | | |
| | | private static final int SERVER_ID_1 = 1201; |
| | | private static final int SERVER_ID_2 = 1202; |
| | | |
| | |
| | | @Test(enabled=true, dependsOnMethods = { "PrimaryTest"}) |
| | | public void TestWithAndWithoutControl() throws Exception |
| | | { |
| | | final String tn = "TestWithAndWithoutControl"; |
| | | replicationServer.getChangelogDB().setPurgeDelay(0); |
| | | // Write changes and read ECL from start |
| | | ECLCompatWriteReadAllOps(1); |
| | | ECLCompatWriteReadAllOps(1, tn); |
| | | |
| | | ECLCompatNoControl(1); |
| | | |
| | | // Write additional changes and read ECL from a provided change number |
| | | ECLCompatWriteReadAllOps(5); |
| | | ECLCompatWriteReadAllOps(5, tn); |
| | | } |
| | | |
| | | @Test(enabled=false, dependsOnMethods = { "PrimaryTest"}) |
| | |
| | | @Test(enabled=false, groups="slow", dependsOnMethods = { "PrimaryTest"}) |
| | | public void ECLReplicationServerFullTest15() throws Exception |
| | | { |
| | | final String tn = "ECLReplicationServerFullTest15"; |
| | | replicationServer.getChangelogDB().setPurgeDelay(0); |
| | | // Write 4 changes and read ECL from start |
| | | ECLCompatWriteReadAllOps(1); |
| | | ECLCompatWriteReadAllOps(1, tn); |
| | | |
| | | // Write 4 additional changes and read ECL from a provided change number |
| | | CSN csn = ECLCompatWriteReadAllOps(5); |
| | | CSN csn = ECLCompatWriteReadAllOps(5, tn); |
| | | |
| | | // Test request from a provided change number - read 6 |
| | | ECLCompatReadFrom(6, csn); |
| | |
| | | |
| | | final CSN[] csns = generateCSNs(3, SERVER_ID_1); |
| | | publishDeleteMsgInOTest(server01, csns[0], testName, 1); |
| | | |
| | | Thread.sleep(1000); |
| | | |
| | | // Test that last cookie has been updated |
| | | String cookieNotEmpty = readLastCookie(); |
| | | debugInfo(testName, "Store cookie not empty=\"" + cookieNotEmpty + "\""); |
| | | |
| | | final String firstCookie = assertLastCookieDifferentThanLastValue(""); |
| | | String lastCookie = firstCookie; |
| | | publishDeleteMsgInOTest(server01, csns[1], testName, 2); |
| | | lastCookie = assertLastCookieDifferentThanLastValue(lastCookie); |
| | | publishDeleteMsgInOTest(server01, csns[2], testName, 3); |
| | | lastCookie = assertLastCookieDifferentThanLastValue(lastCookie); |
| | | |
| | | // --- |
| | | // 2. Now set up a very short purge delay on the replication changelogs |
| | |
| | | // returns the appropriate error. |
| | | debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN)); |
| | | debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2)); |
| | | searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, 0, testName, UNWILLING_TO_PERFORM); |
| | | searchOp = searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM); |
| | | assertTrue(searchOp.getErrorMessage().toString().startsWith( |
| | | ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(TEST_ROOT_DN_STRING).toString()), |
| | | searchOp.getErrorMessage().toString()); |
| | |
| | | |
| | | final CSN[] csns = generateCSNs(3, SERVER_ID_1); |
| | | publishDeleteMsgInOTest(server01, csns[0], testName, 1); |
| | | |
| | | Thread.sleep(1000); |
| | | |
| | | // Test that last cookie has been updated |
| | | String cookieNotEmpty = readLastCookie(); |
| | | debugInfo(testName, "Store cookie not empty=\"" + cookieNotEmpty + "\""); |
| | | |
| | | final String firstCookie = assertLastCookieDifferentThanLastValue(""); |
| | | String lastCookie = firstCookie; |
| | | publishDeleteMsgInOTest(server01, csns[1], testName, 2); |
| | | lastCookie = assertLastCookieDifferentThanLastValue(lastCookie); |
| | | publishDeleteMsgInOTest(server01, csns[2], testName, 3); |
| | | lastCookie = assertLastCookieDifferentThanLastValue(lastCookie); |
| | | |
| | | // --- |
| | | // 2. Now remove the domain by sending a reset message |
| | | ResetGenerationIdMsg msg = new ResetGenerationIdMsg(23657); |
| | | server01.publish(msg); |
| | | server01.publish(new ResetGenerationIdMsg(23657)); |
| | | |
| | | // --- |
| | | // 3. Assert that a request with an empty cookie returns nothing |
| | | // since replication changelog has been cleared |
| | | String cookie= ""; |
| | | InternalSearchOperation searchOp = null; |
| | | searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS); |
| | | |
| | | // --- |
| | |
| | | // since replication changelog has been cleared |
| | | cookie = readLastCookie(); |
| | | debugInfo(testName, "2. Search with last cookie=" + cookie + "\""); |
| | | searchOp = searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS); |
| | | searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS); |
| | | |
| | | // --- |
| | | // 5. Assert that a request with an "old" cookie - one that refers to |
| | |
| | | // returns the appropriate error. |
| | | debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN)); |
| | | debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2)); |
| | | searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, 0, testName, UNWILLING_TO_PERFORM); |
| | | final InternalSearchOperation searchOp = |
| | | searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM); |
| | | assertThat(searchOp.getErrorMessage().toString()).contains("unknown replicated domain", TEST_ROOT_DN_STRING.toString()); |
| | | } |
| | | finally |
| | |
| | | debugInfo(testName, "Ending test successfully"); |
| | | } |
| | | |
| | | private String assertLastCookieDifferentThanLastValue(final String lastCookie) throws Exception |
| | | { |
| | | int cnt = 0; |
| | | while (cnt < 100) |
| | | { |
| | | final String newCookie = readLastCookie(); |
| | | if (!newCookie.equals(lastCookie)) |
| | | { |
| | | return newCookie; |
| | | } |
| | | cnt++; |
| | | Thread.sleep(10); |
| | | } |
| | | Assertions.fail("Expected last cookie would have been updated, but it always stayed at value '" + lastCookie + "'"); |
| | | return null;// dead code |
| | | } |
| | | |
| | | private void debugAndWriteEntries(LDIFWriter ldifWriter, |
| | | List<SearchResultEntry> entries, String tn) throws Exception |
| | | { |
| | |
| | | |
| | | // Publish ADD |
| | | csnCounter++; |
| | | String lentry = "dn: uid="+tn+"2," + TEST_ROOT_DN_STRING + "\n" |
| | | + "objectClass: top\n" + "objectClass: domain\n" |
| | | + "entryUUID: "+user1entryUUID+"\n"; |
| | | Entry entry = TestCaseUtils.entryFromLdifString(lentry); |
| | | Entry entry = TestCaseUtils.entryFromLdifString( |
| | | "dn: uid=" + tn + "2," + TEST_ROOT_DN_STRING + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: domain\n" |
| | | + "entryUUID: " + user1entryUUID + "\n"); |
| | | AddMsg addMsg = new AddMsg( |
| | | csns[csnCounter], |
| | | DN.valueOf("uid="+tn+"2," + TEST_ROOT_DN_STRING), |
| | |
| | | |
| | | InvocationCounterPlugin.resetAllCounters(); |
| | | |
| | | long searchEntries; |
| | | long searchReferences = ldapStatistics.getSearchResultReferences(); |
| | | long searchesDone = ldapStatistics.getSearchResultsDone(); |
| | | final Results results = new Results(); |
| | | results.searchReferences = ldapStatistics.getSearchResultReferences(); |
| | | results.searchesDone = ldapStatistics.getSearchResultsDone(); |
| | | |
| | | debugInfo(tn, "Search Persistent filter=(targetDN=*"+tn+"*,o=test)"); |
| | | LDAPMessage message = new LDAPMessage(2, searchRequest, controls); |
| | | w.writeMessage(message); |
| | | w.writeMessage(new LDAPMessage(2, searchRequest, controls)); |
| | | Thread.sleep(500); |
| | | |
| | | if (!changesOnly) |
| | | { |
| | | // Wait for change 1 |
| | | debugInfo(tn, "Waiting for init search expected to return change 1"); |
| | | searchEntries = 0; |
| | | readMessages(tn, r, results, 1, "Init search Result="); |
| | | for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries) |
| | | { |
| | | while (searchEntries < 1 && (message = r.readMessage()) != null) |
| | | { |
| | | debugInfo(tn, "Init search Result=" + |
| | | message.getProtocolOpType() + message + " " + searchEntries); |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | SearchResultEntryProtocolOp searchResultEntry = |
| | | message.getSearchResultEntryProtocolOp(); |
| | | searchEntries++; |
| | | // FIXME:ECL Double check 1 is really the valid value here. |
| | | checkValue(searchResultEntry.toSearchResultEntry(),"changenumber", |
| | | (compatMode?"1":"0")); |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | searchReferences++; |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | searchesDone++; |
| | | break; |
| | | } |
| | | } |
| | | // FIXME:ECL Double check 1 is really the valid value here. |
| | | final String cn = compatMode ? "1" : "0"; |
| | | checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn); |
| | | } |
| | | debugInfo(tn, "INIT search done with success. searchEntries=" |
| | | + searchEntries + " #searchesDone="+ searchesDone); |
| | | + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone); |
| | | } |
| | | |
| | | // Produces change 2 |
| | |
| | | " published , psearch will now wait for new entries"); |
| | | |
| | | // wait for the 1 new entry |
| | | searchEntries = 0; |
| | | SearchResultEntryProtocolOp searchResultEntry = null; |
| | | while (searchEntries < 1 && (message = r.readMessage()) != null) |
| | | { |
| | | debugInfo(tn, "psearch search Result=" + |
| | | message.getProtocolOpType() + message); |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | searchResultEntry = message.getSearchResultEntryProtocolOp(); |
| | | searchEntries++; |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | searchReferences++; |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | // assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1); |
| | | searchesDone++; |
| | | break; |
| | | } |
| | | } |
| | | readMessages(tn, r, results, 1, "psearch search Result="); |
| | | SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(0); |
| | | Thread.sleep(1000); |
| | | |
| | | // Check we received change 2 |
| | |
| | | createSearchRequest("(targetDN=*directpsearch*,o=test)", null); |
| | | |
| | | debugInfo(tn, "ACI test : sending search"); |
| | | message = new LDAPMessage(2, searchRequest, createCookieControl("")); |
| | | w.writeMessage(message); |
| | | w.writeMessage(new LDAPMessage(2, searchRequest, createCookieControl(""))); |
| | | |
| | | searchesDone=0; |
| | | searchEntries = 0; |
| | | LDAPMessage message; |
| | | int searchesDone = 0; |
| | | int searchEntries = 0; |
| | | int searchReferences = 0; |
| | | while ((searchesDone==0) && (message = r.readMessage()) != null) |
| | | { |
| | | debugInfo(tn, "ACI test : message returned " + |
| | |
| | | |
| | | InvocationCounterPlugin.resetAllCounters(); |
| | | |
| | | ldapStatistics.getSearchRequests(); |
| | | long searchEntries = ldapStatistics.getSearchResultEntries(); |
| | | ldapStatistics.getSearchResultReferences(); |
| | | long searchesDone = ldapStatistics.getSearchResultsDone(); |
| | | final Results results = new Results(); |
| | | results.searchesDone = ldapStatistics.getSearchResultsDone(); |
| | | |
| | | LDAPMessage message; |
| | | message = new LDAPMessage(2, searchRequest1, controls); |
| | | w1.writeMessage(message); |
| | | w1.writeMessage(new LDAPMessage(2, searchRequest1, controls)); |
| | | Thread.sleep(500); |
| | | |
| | | message = new LDAPMessage(2, searchRequest2, controls); |
| | | w2.writeMessage(message); |
| | | w2.writeMessage(new LDAPMessage(2, searchRequest2, controls)); |
| | | Thread.sleep(500); |
| | | |
| | | message = new LDAPMessage(2, searchRequest3, controls); |
| | | w3.writeMessage(message); |
| | | w3.writeMessage(new LDAPMessage(2, searchRequest3, controls)); |
| | | Thread.sleep(500); |
| | | |
| | | if (!changesOnly) |
| | | { |
| | | debugInfo(tn, "Search1 Persistent filter=" + searchRequest1.getFilter() |
| | | + " expected to return change " + csn1); |
| | | searchEntries = 0; |
| | | message = null; |
| | | |
| | | { |
| | | while (searchEntries < 1 && (message = r1.readMessage()) != null) |
| | | readMessages(tn, r1, results, 1, "Search1 Result="); |
| | | final int searchEntries = results.searchResultEntries.size(); |
| | | if (searchEntries == 1) |
| | | { |
| | | debugInfo(tn, "Search1 Result=" + |
| | | message.getProtocolOpType() + " " + message); |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | SearchResultEntryProtocolOp searchResultEntry = |
| | | message.getSearchResultEntryProtocolOp(); |
| | | searchEntries++; |
| | | if (searchEntries==1) |
| | | { |
| | | checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",csn1.toString()); |
| | | checkValue(searchResultEntry.toSearchResultEntry(),"changenumber", |
| | | (compatMode?"10":"0")); |
| | | } |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | searchesDone++; |
| | | break; |
| | | } |
| | | final SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(1); |
| | | final String cn = compatMode ? "10" : "0"; |
| | | checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",csn1.toString()); |
| | | checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn); |
| | | } |
| | | debugInfo(tn, "Search1 done with success. searchEntries=" |
| | | + searchEntries + " #searchesDone=" + results.searchesDone); |
| | | } |
| | | debugInfo(tn, "Search1 done with success. searchEntries=" |
| | | + searchEntries + " #searchesDone="+ searchesDone); |
| | | |
| | | searchEntries = 0; |
| | | message = null; |
| | | { |
| | | debugInfo(tn, "Search 2 Persistent filter=" + searchRequest2.getFilter() |
| | | + " expected to return change " + csn2 + " & " + csn3); |
| | | while (searchEntries < 2 && (message = r2.readMessage()) != null) |
| | | readMessages(tn, r2, results, 2, "Search 2 Result="); |
| | | for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries) |
| | | { |
| | | debugInfo(tn, "Search 2 Result=" + |
| | | message.getProtocolOpType() + message); |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | SearchResultEntryProtocolOp searchResultEntry = |
| | | message.getSearchResultEntryProtocolOp(); |
| | | searchEntries++; |
| | | checkValue(searchResultEntry.toSearchResultEntry(),"changenumber", |
| | | (compatMode?"10":"0")); |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | searchesDone++; |
| | | break; |
| | | } |
| | | final String cn = compatMode ? "10" : "0"; |
| | | checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn); |
| | | } |
| | | debugInfo(tn, "Search2 done with success. searchEntries=" |
| | | + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone); |
| | | } |
| | | debugInfo(tn, "Search2 done with success. searchEntries=" |
| | | + searchEntries + " #searchesDone="+ searchesDone); |
| | | |
| | | |
| | | searchEntries = 0; |
| | | message = null; |
| | | { |
| | | debugInfo(tn, "Search3 Persistent filter=" + searchRequest3.getFilter() |
| | | + " expected to return change top + " + csn1 + " & " + csn2 + " & " + csn3); |
| | | while (searchEntries < 4 && (message = r3.readMessage()) != null) |
| | | { |
| | | debugInfo(tn, "Search3 Result=" + |
| | | message.getProtocolOpType() + " " + message); |
| | | |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | searchEntries++; |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | searchesDone++; |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | debugInfo(tn, "Search3 Persistent filter=" + searchRequest3.getFilter() |
| | | + " expected to return change top + " + csn1 + " & " + csn2 + " & " + csn3); |
| | | readMessages(tn, r3, results, 4, "Search3 Result="); |
| | | debugInfo(tn, "Search3 done with success. searchEntries=" |
| | | + searchEntries + " #searchesDone="+ searchesDone); |
| | | |
| | | + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone); |
| | | } |
| | | |
| | | // Produces additional change |
| | |
| | | debugInfo(tn, delMsg13.getCSN() + " published additionally "); |
| | | |
| | | // wait 11 |
| | | searchEntries = 0; |
| | | message = null; |
| | | while (searchEntries < 1 && (message = r1.readMessage()) != null) |
| | | { |
| | | debugInfo(tn, "Search 11 Result=" + |
| | | message.getProtocolOpType() + " " + message); |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | searchEntries++; |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | // assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1); |
| | | searchesDone++; |
| | | break; |
| | | } |
| | | } |
| | | readMessages(tn, r1, results, 1, "Search 11 Result="); |
| | | Thread.sleep(1000); |
| | | debugInfo(tn, "Search 1 successfully receives additional changes"); |
| | | |
| | | // wait 12 & 13 |
| | | searchEntries = 0; |
| | | message = null; |
| | | while (searchEntries < 2 && (message = r2.readMessage()) != null) |
| | | { |
| | | debugInfo(tn, "psearch search 12 Result=" + |
| | | message.getProtocolOpType() + " " + message); |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | searchEntries++; |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | // assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1); |
| | | searchesDone++; |
| | | break; |
| | | } |
| | | } |
| | | readMessages(tn, r2, results, 2, "psearch search 12 Result="); |
| | | Thread.sleep(1000); |
| | | debugInfo(tn, "Search 2 successfully receives additional changes"); |
| | | |
| | | // wait 11 & 12 & 13 |
| | | searchEntries = 0; |
| | | SearchResultEntryProtocolOp searchResultEntry = null; |
| | | message = null; |
| | | while (searchEntries < 3 && (message = r3.readMessage()) != null) |
| | | { |
| | | debugInfo(tn, "psearch search 13 Result=" + |
| | | message.getProtocolOpType() + " " + message); |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | searchResultEntry = message.getSearchResultEntryProtocolOp(); |
| | | searchEntries++; |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | // assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1); |
| | | searchesDone++; |
| | | break; |
| | | } |
| | | } |
| | | readMessages(tn, r3, results, 3, "psearch search 13 Result="); |
| | | SearchResultEntryProtocolOp searchResultEntry = |
| | | results.searchResultEntries.get(results.searchResultEntries.size() - 1); |
| | | Thread.sleep(1000); |
| | | |
| | | // Check we received change 13 |
| | |
| | | debugInfo(tn, "Ends test successfully"); |
| | | } |
| | | |
| | | private void readMessages(String tn, org.opends.server.tools.LDAPReader r, |
| | | final Results results, final int i, final String string) throws Exception |
| | | { |
| | | results.searchResultEntries.clear(); |
| | | |
| | | LDAPMessage message; |
| | | while (results.searchResultEntries.size() < i |
| | | && (message = r.readMessage()) != null) |
| | | { |
| | | debugInfo(tn, string + message.getProtocolOpType() + " " + message); |
| | | |
| | | switch (message.getProtocolOpType()) |
| | | { |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: |
| | | results.searchResultEntries.add(message.getSearchResultEntryProtocolOp()); |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: |
| | | results.searchReferences++; |
| | | break; |
| | | |
| | | case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: |
| | | assertSuccessful(message); |
| | | results.searchesDone++; |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void assertSuccessful(LDAPMessage message) |
| | | { |
| | | SearchResultDoneProtocolOp doneOp = message.getSearchResultDoneProtocolOp(); |
| | |
| | | new BindRequestProtocolOp( |
| | | ByteString.valueOf(bindDN), |
| | | 3, ByteString.valueOf(password)); |
| | | LDAPMessage message = new LDAPMessage(1, bindRequest); |
| | | w.writeMessage(message); |
| | | w.writeMessage(new LDAPMessage(1, bindRequest)); |
| | | |
| | | message = r.readMessage(); |
| | | final LDAPMessage message = r.readMessage(); |
| | | BindResponseProtocolOp bindResponse = message.getBindResponseProtocolOp(); |
| | | // assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1); |
| | | assertEquals(bindResponse.getResultCode(), expected); |
| | |
| | | debugInfo(tn, "Ending test successfully"); |
| | | } |
| | | |
| | | private CSN ECLCompatWriteReadAllOps(long firstChangeNumber) throws Exception |
| | | private CSN ECLCompatWriteReadAllOps(long firstChangeNumber, String testName) throws Exception |
| | | { |
| | | String tn = "ECLCompatWriteReadAllOps/" + firstChangeNumber; |
| | | String tn = testName + "-ECLCompatWriteReadAllOps/" + firstChangeNumber; |
| | | debugInfo(tn, "Starting test\n\n"); |
| | | LDAPReplicationDomain domain = null; |
| | | try |
| | |
| | | CSN[] csns = generateCSNs(4, SERVER_ID_1); |
| | | |
| | | // Publish DEL |
| | | DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID); |
| | | DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "-1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID); |
| | | server01.publish(delMsg); |
| | | debugInfo(tn, " publishes " + delMsg.getCSN()); |
| | | |
| | | // Publish ADD |
| | | String lentry = |
| | | "dn: uid="+tn+"2," + TEST_ROOT_DN_STRING + "\n" |
| | | Entry entry = TestCaseUtils.entryFromLdifString( |
| | | "dn: uid=" + tn + "-2," + TEST_ROOT_DN_STRING + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: domain\n" |
| | | + "entryUUID: "+user1entryUUID+"\n"; |
| | | Entry entry = TestCaseUtils.entryFromLdifString(lentry); |
| | | + "entryUUID: " + user1entryUUID + "\n"); |
| | | AddMsg addMsg = new AddMsg( |
| | | csns[1], |
| | | entry.getName(), |
| | |
| | | debugInfo(tn, " publishes " + addMsg.getCSN()); |
| | | |
| | | // Publish MOD |
| | | DN baseDN = DN.valueOf("uid="+tn+"3," + TEST_ROOT_DN_STRING); |
| | | DN baseDN = DN.valueOf("uid="+tn+"-3," + TEST_ROOT_DN_STRING); |
| | | List<Modification> mods = createMods("description", "new value"); |
| | | ModifyMsg modMsg = new ModifyMsg(csns[2], baseDN, mods, user1entryUUID); |
| | | server01.publish(modMsg); |
| | |
| | | |
| | | // Publish modDN |
| | | ModifyDNOperation op = new ModifyDNOperationBasis(connection, 1, 1, null, |
| | | DN.valueOf("uid="+tn+"4," + TEST_ROOT_DN_STRING), // entryDN |
| | | DN.valueOf("uid="+tn+"-4," + TEST_ROOT_DN_STRING), // entryDN |
| | | RDN.decode("uid="+tn+"new4"), // new rdn |
| | | true, // deleteoldrdn |
| | | TEST_ROOT_DN2); // new superior |
| | |
| | | server01.publish(modDNMsg); |
| | | debugInfo(tn, " publishes " + modDNMsg.getCSN()); |
| | | |
| | | String filter = "(targetdn=*" + tn + "*,o=test)"; |
| | | InternalSearchOperation searchOp = searchOnChangelog(filter, 4, tn, SUCCESS); |
| | | InternalSearchOperation searchOp = |
| | | searchOnChangelog("(targetdn=*" + tn + "*,o=test)", 4, tn, SUCCESS); |
| | | |
| | | // test 4 entries returned |
| | | final LDIFWriter ldifWriter = getLDIFWriter(); |
| | |
| | | stop(server01); |
| | | |
| | | // Test with filter on change number |
| | | filter = |
| | | String filter = |
| | | "(&(targetdn=*" + tn + "*,o=test)" |
| | | + "(&(changenumber>=" + firstChangeNumber + ")" |
| | | + "(changenumber<=" + (firstChangeNumber + 3) + ")))"; |
| | |
| | | long firstChangeNumber, int i, String tn, CSN csn) |
| | | { |
| | | final long changeNumber = firstChangeNumber + i; |
| | | final String targetDN = "uid=" + tn + (i + 1) + "," + TEST_ROOT_DN_STRING; |
| | | final String targetDN = "uid=" + tn + "-" + (i + 1) + "," + TEST_ROOT_DN_STRING; |
| | | |
| | | assertDNEquals(resultEntry, changeNumber); |
| | | checkValue(resultEntry, "changenumber", String.valueOf(changeNumber)); |
| | |
| | | |
| | | private void assertDNEquals(SearchResultEntry resultEntry, long changeNumber) |
| | | { |
| | | String actualDN = resultEntry.getName().toNormalizedString(); |
| | | String expectedDN = "changenumber=" + changeNumber + ",cn=changelog"; |
| | | assertThat(actualDN).isEqualToIgnoringCase(expectedDN); |
| | | final String actualDN = resultEntry.getName().toNormalizedString(); |
| | | final String expectedDN = "changenumber=" + changeNumber + ",cn=changelog"; |
| | | assertThat(actualDN) |
| | | .as("Unexpected DN for entry " + resultEntry) |
| | | .isEqualToIgnoringCase(expectedDN); |
| | | } |
| | | |
| | | private void ECLCompatReadFrom(long firstChangeNumber, Object csn) throws Exception |
| | |
| | | while (!cnIndexDB.isEmpty()) |
| | | { |
| | | debugInfo(tn, "cnIndexDB.count=" + cnIndexDB.count()); |
| | | Thread.sleep(200); |
| | | Thread.sleep(10); |
| | | } |
| | | |
| | | debugInfo(tn, "Ending test with success"); |
| | |
| | | private ChangeNumberIndexDB cnIndexDB; |
| | | @Mock |
| | | private ReplicationDomainDB domainDB; |
| | | private Map<Pair<DN, Integer>, SequentialDBCursor> cursors; |
| | | |
| | | private List<DN> eclEnabledDomains; |
| | | private MultiDomainDBCursor multiDomainCursor; |
| | | private Map<Pair<DN, Integer>, SequentialDBCursor> replicaDBCursors; |
| | | private Map<DN, DomainDBCursor> domainDBCursors; |
| | | private ChangelogState initialState; |
| | | private Map<DN, ServerState> domainNewestCSNs; |
| | | private ChangeNumberIndexer cnIndexer; |
| | |
| | | public void setup() throws Exception |
| | | { |
| | | MockitoAnnotations.initMocks(this); |
| | | when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB); |
| | | when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB); |
| | | |
| | | multiDomainCursor = new MultiDomainDBCursor(domainDB); |
| | | initialState = new ChangelogState(); |
| | | initialCookie = new MultiDomainServerState(); |
| | | cursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>(); |
| | | replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>(); |
| | | domainDBCursors = new HashMap<DN, DomainDBCursor>(); |
| | | domainNewestCSNs = new HashMap<DN, ServerState>(); |
| | | |
| | | when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB); |
| | | when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB); |
| | | when(domainDB.getCursorFrom(any(MultiDomainServerState.class))).thenReturn( |
| | | multiDomainCursor); |
| | | } |
| | | |
| | | @AfterMethod |
| | |
| | | @Test |
| | | public void emptyDBNoDS() throws Exception |
| | | { |
| | | startCNIndexer(BASE_DN1); |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBOneDS() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void nonEmptyDBOneDS() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | setCNIndexDBInitialRecords(msg1); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSs() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | // simulate messages received out of order |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsDifferentDomains() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1, BASE_DN2); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN2, serverId2); |
| | | startCNIndexer(BASE_DN1, BASE_DN2); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void nonEmptyDBTwoDSs() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | setCNIndexDBInitialRecords(msg1, msg2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(ADMIN_DATA_DN, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | addReplica(BASE_DN1, serverId3); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | // cn=admin data will does not participate in the external changelog |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBOneInitialDSAnotherDSJoining() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneGoingOffline() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneInitiallyOffline() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1)); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | publishUpdateMsg(msg1); |
| | | initialState.addOfflineReplica(BASE_DN1, new CSN(2, 1, serverId1)); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | |
| | | // blocked until we receive info for serverId2 |
| | | assertExternalChangelogContent(); |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1)); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2); |
| | | final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3); |
| | | publishUpdateMsg(msg2, msg3); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | // MCP moves forward because serverId1 is not really offline |
| | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneKilled() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | startCNIndexer(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | |
| | | |
| | | private void addReplica(DN baseDN, int serverId) throws Exception |
| | | { |
| | | final SequentialDBCursor cursor = new SequentialDBCursor(); |
| | | cursors.put(Pair.of(baseDN, serverId), cursor); |
| | | when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class))) |
| | | .thenReturn(cursor); |
| | | final SequentialDBCursor replicaDBCursor = new SequentialDBCursor(); |
| | | replicaDBCursors.put(Pair.of(baseDN, serverId), replicaDBCursor); |
| | | |
| | | if (isECLEnabledDomain2(baseDN)) |
| | | { |
| | | DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN); |
| | | if (domainDBCursor == null) |
| | | { |
| | | domainDBCursor = new DomainDBCursor(baseDN, domainDB); |
| | | domainDBCursors.put(baseDN, domainDBCursor); |
| | | |
| | | multiDomainCursor.addDomain(baseDN, null); |
| | | when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class))) |
| | | .thenReturn(domainDBCursor); |
| | | } |
| | | domainDBCursor.addReplicaDB(serverId, null); |
| | | when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class))) |
| | | .thenReturn(replicaDBCursor); |
| | | } |
| | | |
| | | when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn( |
| | | getDomainNewestCSNs(baseDN)); |
| | | initialState.addServerIdToDomain(serverId, baseDN); |
| | |
| | | return serverState; |
| | | } |
| | | |
| | | private void startCNIndexer(DN... eclEnabledDomains) |
| | | private void startCNIndexer() |
| | | { |
| | | final List<DN> eclEnabledDomainList = Arrays.asList(eclEnabledDomains); |
| | | cnIndexer = new ChangeNumberIndexer(changelogDB, initialState) |
| | | { |
| | | @Override |
| | | protected boolean isECLEnabledDomain(DN baseDN) |
| | | { |
| | | return eclEnabledDomainList.contains(baseDN); |
| | | return isECLEnabledDomain2(baseDN); |
| | | } |
| | | |
| | | }; |
| | | cnIndexer.start(); |
| | | waitForWaitingState(cnIndexer); |
| | | } |
| | | |
| | | private boolean isECLEnabledDomain2(DN baseDN) |
| | | { |
| | | return eclEnabledDomains.contains(baseDN); |
| | | } |
| | | |
| | | private void stopCNIndexer() throws Exception |
| | | { |
| | | if (cnIndexer != null) |
| | |
| | | final CSN csn = newestMsg.getCSN(); |
| | | when(cnIndexDB.getNewestRecord()).thenReturn( |
| | | new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn)); |
| | | final SequentialDBCursor cursor = cursors.get(Pair.of(baseDN, csn.getServerId())); |
| | | final SequentialDBCursor cursor = |
| | | replicaDBCursors.get(Pair.of(baseDN, csn.getServerId())); |
| | | cursor.add(newestMsg); |
| | | } |
| | | initialCookie.update(msg.getBaseDN(), msg.getCSN()); |
| | |
| | | for (ReplicatedUpdateMsg msg : msgs) |
| | | { |
| | | final SequentialDBCursor cursor = |
| | | cursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId())); |
| | | replicaDBCursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId())); |
| | | if (msg.isEmptyCursor()) |
| | | { |
| | | cursor.add(null); |
| | |
| | | }; |
| | | } |
| | | |
| | | @Test(dataProvider = "precedingCSNDataProvider") |
| | | public void getPrecedingCSN(CSN start, CSN expected) |
| | | { |
| | | cnIndexer = new ChangeNumberIndexer(changelogDB, initialState); |
| | | CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start); |
| | | assertThat(precedingCSN).isEqualTo(expected); |
| | | } |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | import java.util.Collections; |
| | | import java.util.Iterator; |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | |
| | | public class CompositeDBCursorTest extends DirectoryServerTestCase |
| | | { |
| | | |
| | | private final class ConcreteCompositeDBCursor extends CompositeDBCursor<String> |
| | | { |
| | | @Override |
| | | protected void incorporateNewCursors() throws ChangelogException |
| | | { |
| | | } |
| | | |
| | | @Override |
| | | protected Iterator<String> removedCursorsIterator() |
| | | { |
| | | return Collections.EMPTY_LIST.iterator(); |
| | | } |
| | | } |
| | | |
| | | private UpdateMsg msg1; |
| | | private UpdateMsg msg2; |
| | | private UpdateMsg msg3; |
| | |
| | | of(msg4, baseDN1)); |
| | | } |
| | | |
| | | // TODO : this test fails because msg2 is returned twice |
| | | @Test(enabled=false) |
| | | public void recycleTwoElementsCursorsLongerExhaustion() throws Exception |
| | | { |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |
| | |
| | | private CompositeDBCursor<String> newCompositeDBCursor( |
| | | Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception |
| | | { |
| | | final Map<DBCursor<UpdateMsg>, String> cursorsMap = |
| | | new HashMap<DBCursor<UpdateMsg>, String>(); |
| | | final CompositeDBCursor<String> cursor = new ConcreteCompositeDBCursor(); |
| | | for (Pair<? extends DBCursor<UpdateMsg>, String> pair : pairs) |
| | | { |
| | | // The cursors in the composite are expected to be pointing |
| | | // to first record available |
| | | pair.getFirst().next(); |
| | | cursorsMap.put(pair.getFirst(), pair.getSecond()); |
| | | cursor.addCursor(pair.getFirst(), pair.getSecond()); |
| | | } |
| | | return new CompositeDBCursor<String>(cursorsMap, true); |
| | | return cursor; |
| | | } |
| | | |
| | | private void assertInOrder(final CompositeDBCursor<String> compCursor, |