| | |
| | | * @param mdss The provided string representation of the state. |
| | | * @throws DirectoryException when the string has an invalid format |
| | | */ |
| | | public MultiDomainServerState(String mdss) |
| | | throws DirectoryException |
| | | public MultiDomainServerState(String mdss) throws DirectoryException |
| | | { |
| | | list = splitGenStateToServerStates(mdss); |
| | | } |
| | |
| | | |
| | | synchronized(this) |
| | | { |
| | | int serverId = csn.getServerId(); |
| | | ServerState oldServerState = list.get(baseDN); |
| | | if (oldServerState == null) |
| | | oldServerState = new ServerState(); |
| | | |
| | | if (csn.isNewerThan(oldServerState.getCSN(serverId))) |
| | | { |
| | | oldServerState.update(csn); |
| | | oldServerState = new ServerState(); |
| | | list.put(baseDN, oldServerState); |
| | | return true; |
| | | } |
| | | return false; |
| | | return oldServerState.update(csn); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public void update(DN baseDN, ServerState serverState) |
| | | { |
| | | list.put(baseDN, serverState); |
| | | for (CSN csn : serverState) |
| | | { |
| | | update(baseDN, csn); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Update the current object with the provided multi domain server state. |
| | | * |
| | | * @param state |
| | | * The provided multi domain server state. |
| | | */ |
| | | public void update(MultiDomainServerState state) |
| | | { |
| | | for (Entry<DN, ServerState> entry : state.list.entrySet()) |
| | | { |
| | | update(entry.getKey(), entry.getValue()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | StringBuilder res = new StringBuilder(); |
| | | final StringBuilder res = new StringBuilder(); |
| | | if (list != null && !list.isEmpty()) |
| | | { |
| | | for (Entry<DN, ServerState> entry : list.entrySet()) |
| | |
| | | buffer.append(this); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Tests if the state is empty. |
| | | * |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns the ServerState associated to the provided replication domain's |
| | | * baseDN. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain's baseDN |
| | | * @return the associated ServerState |
| | | */ |
| | | public ServerState get(DN baseDN) |
| | | { |
| | | return list.get(baseDN); |
| | | } |
| | | |
| | | /** |
| | | * Test if this object equals the provided other object. |
| | | * @param other The other object with which we want to test equality. |
| | | * @return Returns True if this equals other, else return false. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Test if this object covers the provided CSN for the provided baseDN. |
| | | * |
| | | * @param baseDN |
| | | * The provided baseDN. |
| | | * @param csn |
| | | * The provided CSN. |
| | | * @return true when this object covers the provided CSN for the provided |
| | | * baseDN. |
| | | */ |
| | | public boolean cover(DN baseDN, CSN csn) |
| | | { |
| | | final ServerState state = list.get(baseDN); |
| | | return state != null && state.cover(csn); |
| | | } |
| | | |
| | | /** |
| | | * Splits the provided generalizedServerState being a String with the |
| | | * following syntax: "domain1:state1;domain2:state2;..." to a Map of (domain |
| | | * DN, domain ServerState). |
| | |
| | | public static Map<DN, ServerState> splitGenStateToServerStates( |
| | | String multiDomainServerState) throws DirectoryException |
| | | { |
| | | Map<DN, ServerState> startStates = new TreeMap<DN, ServerState>(); |
| | | final Map<DN, ServerState> startStates = new TreeMap<DN, ServerState>(); |
| | | if (multiDomainServerState != null && multiDomainServerState.length() > 0) |
| | | { |
| | | try |
| | |
| | | ServerState startAfterServerState) throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} for one replicaDB for the specified |
| | | * replication domain and serverId starting after the provided {@link CSN}. |
| | | * <p> |
| | | * The cursor is already advanced to the records after the CSN. |
| | | * <p> |
| | | * When the cursor is not used anymore, client code MUST call the |
| | | * {@link DBCursor#close()} method to free the resources and locks used by the |
| | | * cursor. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain baseDN of the replicaDB |
| | | * @param serverId |
| | | * the serverId of the replicaDB |
| | | * @param startAfterCSN |
| | | * Starting point for the ReplicaDB cursor. If the CSN is null, then |
| | | * start from the oldest CSN for this replicaDB |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see #getCursorFrom(DN, CSN) |
| | | */ |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN) |
| | | throws ChangelogException; |
| | | |
| | | /** |
| | | * Publishes the provided change to the changelog DB for the specified |
| | | * serverId and replication domain. After a change has been successfully |
| | | * published, it becomes available to be returned by the External ChangeLog. |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2013 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.*; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.changelog.api.*; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.types.DirectoryException; |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | | /** |
| | | * Thread responsible for inserting replicated changes into the ChangeNumber |
| | | * Index DB (CNIndexDB for short). Only changes older than the medium |
| | | * consistency point are inserted in the CNIndexDB. As a consequence this class |
| | | * is also responsible for maintaining the medium consistency point. |
| | | */ |
| | | public class ChangeNumberIndexer extends DirectoryThread |
| | | { |
| | | /** The tracer object for the debug logger. */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | private final ChangelogDB changelogDB; |
| | | /** Only used for initialization, and then discarded. */ |
| | | private ChangelogState changelogState; |
| | | |
| | | /* |
| | | * previousCookie and mediumConsistencyPoint must be thread safe, because |
| | | * 1) initialization can happen while the replication server starts receiving |
| | | * updates 2) many updates can happen concurrently. This solution also avoids |
| | | * using a queue that could fill up before we have consumed all its content. |
| | | */ |
| | | /** |
| | | * Stores the value of the cookie before the change currently processed is |
| | | * inserted in the DB. After insert, it is updated with the CSN of the change |
| | | * currently processed (thus becoming the "current" cookie just before the |
| | | * change is returned. |
| | | */ |
| | | private final MultiDomainServerState previousCookie = |
| | | new MultiDomainServerState(); |
| | | |
| | | /** |
| | | * Holds the medium consistency point for the current replication server. |
| | | * |
| | | * @see <a href= |
| | | * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names" |
| | | * >OpenDJ Domain Names for a description of what the medium consistency point |
| | | * is</a> |
| | | */ |
| | | private final MultiDomainServerState mediumConsistencyPoint = |
| | | new MultiDomainServerState(); |
| | | |
| | | /** |
| | | * Composite cursor across all the replicaDBs for all the replication domains. |
| | | * It is volatile to ensure it supports concurrent update. Each time it is |
| | | * used more than once in a method, the method must take a local copy to |
| | | * ensure the cursor does not get updated in the middle of the method. |
| | | */ |
| | | private volatile CompositeDBCursor<DN> crossDomainDBCursor; |
| | | |
| | | /** |
| | | * New cursors for this Map must be created from the {@link #run()} method, |
| | | * i.e. from the same thread that will make use of them. If this rule is not |
| | | * obeyed, then a JE exception will be thrown about |
| | | * "Non-transactional Cursors may not be used in multiple threads;". |
| | | */ |
| | | private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors = |
| | | new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>(); |
| | | /** This map can be updated by multiple threads. */ |
| | | private ConcurrentMap<Integer, DN> newCursors = |
| | | new ConcurrentSkipListMap<Integer, DN>(); |
| | | |
| | | /** |
| | | * Builds a ChangeNumberIndexer object. |
| | | * |
| | | * @param changelogDB |
| | | * the changelogDB |
| | | * @param changelogState |
| | | * the changelog state used for initialization |
| | | */ |
| | | ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState) |
| | | { |
| | | super("Change number indexer"); |
| | | this.changelogDB = changelogDB; |
| | | this.changelogState = changelogState; |
| | | } |
| | | |
| | | /** |
| | | * Ensures the medium consistency point is updated by heartbeats. |
| | | * |
| | | * @param baseDN |
| | | * the baseDN of the domain for which the heartbeat is published |
| | | * @param heartbeatCSN |
| | | * the CSN coming from the heartbeat |
| | | */ |
| | | public void publishHeartbeat(DN baseDN, CSN heartbeatCSN) |
| | | { |
| | | mediumConsistencyPoint.update(baseDN, heartbeatCSN); |
| | | final CompositeDBCursor<DN> localCursor = crossDomainDBCursor; |
| | | final DN changeBaseDN = localCursor.getData(); |
| | | final CSN changeCSN = localCursor.getRecord().getCSN(); |
| | | tryNotify(changeBaseDN, changeCSN); |
| | | } |
| | | |
| | | /** |
| | | * Ensures the medium consistency point is updated by UpdateMsg. |
| | | * |
| | | * @param baseDN |
| | | * the baseDN of the domain for which the heartbeat is published |
| | | * @param updateMsg |
| | | * the updateMsg that will update the medium consistency point |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg) |
| | | throws ChangelogException |
| | | { |
| | | final CSN csn = updateMsg.getCSN(); |
| | | mediumConsistencyPoint.update(baseDN, csn); |
| | | newCursors.put(csn.getServerId(), baseDN); |
| | | tryNotify(baseDN, csn); |
| | | } |
| | | |
| | | /** |
| | | * Notifies the Change number indexer thread if it will be able to do some |
| | | * work. |
| | | */ |
| | | private void tryNotify(final DN baseDN, final CSN csn) |
| | | { |
| | | if (mediumConsistencyPoint.cover(baseDN, csn)) |
| | | { |
| | | synchronized (this) |
| | | { |
| | | notify(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void initialize() throws ChangelogException, DirectoryException |
| | | { |
| | | final ChangeNumberIndexRecord newestRecord = |
| | | changelogDB.getChangeNumberIndexDB().getNewestRecord(); |
| | | if (newestRecord != null) |
| | | { |
| | | previousCookie.update( |
| | | new MultiDomainServerState(newestRecord.getPreviousCookie())); |
| | | } |
| | | |
| | | // initialize the cross domain DB cursor |
| | | final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); |
| | | for (Entry<DN, List<Integer>> entry |
| | | : changelogState.getDomainToServerIds().entrySet()) |
| | | { |
| | | final DN baseDN = entry.getKey(); |
| | | for (Integer serverId : entry.getValue()) |
| | | { |
| | | final ServerState previousSS = previousCookie.get(baseDN); |
| | | final CSN csn = previousSS != null ? previousSS.getCSN(serverId) : null; |
| | | ensureCursorExists(baseDN, serverId, csn); |
| | | } |
| | | |
| | | ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); |
| | | mediumConsistencyPoint.update(baseDN, latestKnownState); |
| | | } |
| | | |
| | | crossDomainDBCursor = newCompositeDBCursor(); |
| | | if (newestRecord != null) |
| | | { |
| | | // restore the "previousCookie" state before shutdown |
| | | final UpdateMsg record = crossDomainDBCursor.getRecord(); |
| | | if (!record.getCSN().equals(newestRecord.getCSN())) |
| | | { |
| | | // TODO JNR remove |
| | | throw new RuntimeException("They do not equal! recordCSN=" |
| | | + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN()); |
| | | } |
| | | // TODO JNR is it possible to use the following line instead? |
| | | // previousCookie.update(newestRecord.getBaseDN(), record.getCSN()); |
| | | // TODO JNR would this mean updating the if above? |
| | | previousCookie.update(crossDomainDBCursor.getData(), record.getCSN()); |
| | | crossDomainDBCursor.next(); |
| | | } |
| | | |
| | | // this will not be used any more. Discard for garbage collection. |
| | | this.changelogState = null; |
| | | } |
| | | |
| | | private CompositeDBCursor<DN> newCompositeDBCursor() throws ChangelogException |
| | | { |
| | | final Map<DBCursor<UpdateMsg>, DN> cursors = |
| | | new HashMap<DBCursor<UpdateMsg>, DN>(); |
| | | for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry |
| | | : this.allCursors.entrySet()) |
| | | { |
| | | for (Entry<Integer, DBCursor<UpdateMsg>> entry2 |
| | | : entry.getValue().entrySet()) |
| | | { |
| | | cursors.put(entry2.getValue(), entry.getKey()); |
| | | } |
| | | } |
| | | final CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors); |
| | | result.next(); |
| | | return result; |
| | | } |
| | | |
| | | private boolean ensureCursorExists(DN baseDN, Integer serverId, CSN csn) |
| | | throws ChangelogException |
| | | { |
| | | Map<Integer, DBCursor<UpdateMsg>> map = allCursors.get(baseDN); |
| | | if (map == null) |
| | | { |
| | | map = new ConcurrentSkipListMap<Integer, DBCursor<UpdateMsg>>(); |
| | | allCursors.put(baseDN, map); |
| | | } |
| | | DBCursor<UpdateMsg> cursor = map.get(serverId); |
| | | if (cursor == null) |
| | | { |
| | | final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); |
| | | cursor = domainDB.getCursorFrom(baseDN, serverId, csn); |
| | | map.put(serverId, cursor); |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | try |
| | | { |
| | | /* |
| | | * initialize here to allow fast application start up and avoid errors due |
| | | * cursors being created in a different thread to the one where they are |
| | | * used. |
| | | */ |
| | | initialize(); |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | // TODO JNR error message i18n |
| | | if (debugEnabled()) |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | return; |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | // TODO Auto-generated catch block |
| | | if (debugEnabled()) |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | return; |
| | | } |
| | | |
| | | while (!isShutdownInitiated()) |
| | | { |
| | | try |
| | | { |
| | | createNewCursors(); |
| | | |
| | | final UpdateMsg msg = crossDomainDBCursor.getRecord(); |
| | | if (msg == null) |
| | | { |
| | | synchronized (this) |
| | | { |
| | | wait(); |
| | | } |
| | | // advance cursor, success/failure will be checked later |
| | | crossDomainDBCursor.next(); |
| | | // loop to check whether new changes have been added to the ReplicaDBs |
| | | continue; |
| | | } |
| | | |
| | | final CSN csn = msg.getCSN(); |
| | | final DN baseDN = crossDomainDBCursor.getData(); |
| | | // FIXME problem: what if the serverId is not part of the ServerState? |
| | | // right now, thread will be blocked |
| | | if (!mediumConsistencyPoint.cover(baseDN, csn)) |
| | | { |
| | | // the oldest record to insert is newer than the medium consistency |
| | | // point. Let's wait for a change that can be published. |
| | | synchronized (this) |
| | | { |
| | | // double check to protect against a missed call to notify() |
| | | if (!mediumConsistencyPoint.cover(baseDN, csn)) |
| | | { |
| | | wait(); |
| | | // loop to check if changes older than the medium consistency |
| | | // point have been added to the ReplicaDBs |
| | | continue; |
| | | } |
| | | } |
| | | } |
| | | |
| | | // OK, the oldest change is older than the medium consistency point |
| | | // let's publish it to the CNIndexDB |
| | | final ChangeNumberIndexRecord record = |
| | | new ChangeNumberIndexRecord(previousCookie.toString(), baseDN, csn); |
| | | changelogDB.getChangeNumberIndexDB().addRecord(record); |
| | | // update, so it becomes the previous cookie for the next change |
| | | previousCookie.update(baseDN, csn); |
| | | |
| | | // advance cursor, success/failure will be checked later |
| | | crossDomainDBCursor.next(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | // TODO JNR error message i18n |
| | | } |
| | | catch (InterruptedException ignored) |
| | | { |
| | | // was shutdown called? |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void createNewCursors() throws ChangelogException |
| | | { |
| | | if (!newCursors.isEmpty()) |
| | | { |
| | | boolean newCursorAdded = false; |
| | | for (Iterator<Entry<Integer, DN>> iter = newCursors.entrySet().iterator(); |
| | | iter.hasNext();) |
| | | { |
| | | final Entry<Integer, DN> entry = iter.next(); |
| | | if (!ensureCursorExists(entry.getValue(), entry.getKey(), null)) |
| | | { |
| | | newCursorAdded = true; |
| | | } |
| | | iter.remove(); |
| | | } |
| | | if (newCursorAdded) |
| | | { |
| | | crossDomainDBCursor = newCompositeDBCursor(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | } |
| | |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.*; |
| | | import java.util.Map.Entry; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | |
| | | * {@link DBCursor} implementation that iterates across a Collection of |
| | | * {@link DBCursor}s, advancing from the oldest to the newest change cross all |
| | | * cursors. |
| | | * |
| | | * @param <Data> |
| | | * The type of data associated with each cursor |
| | | */ |
| | | final class CompositeDBCursor implements DBCursor<UpdateMsg> |
| | | final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | private UpdateMsg currentChange; |
| | | private final List<DBCursor<UpdateMsg>> exhaustedCursors = |
| | | new ArrayList<DBCursor<UpdateMsg>>(); |
| | | private UpdateMsg currentRecord; |
| | | private Data currentData; |
| | | private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors = |
| | | new HashMap<DBCursor<UpdateMsg>, Data>(); |
| | | /** |
| | | * The cursors are sorted based on the current change of each cursor to |
| | | * consider the next change across all available cursors. |
| | | */ |
| | | private final NavigableSet<DBCursor<UpdateMsg>> cursors = |
| | | new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>() |
| | | { |
| | | @Override |
| | | public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2) |
| | | { |
| | | final CSN csn1 = o1.getRecord().getCSN(); |
| | | final CSN csn2 = o2.getRecord().getCSN(); |
| | | return CSN.compare(csn1, csn2); |
| | | } |
| | | }); |
| | | private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors = |
| | | new TreeMap<DBCursor<UpdateMsg>, Data>( |
| | | new Comparator<DBCursor<UpdateMsg>>() |
| | | { |
| | | @Override |
| | | public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2) |
| | | { |
| | | final CSN csn1 = o1.getRecord().getCSN(); |
| | | final CSN csn2 = o2.getRecord().getCSN(); |
| | | return CSN.compare(csn1, csn2); |
| | | } |
| | | }); |
| | | |
| | | /** |
| | | * Builds a CompositeDBCursor using the provided collection of cursors. |
| | |
| | | * @param cursors |
| | | * the cursors that will be iterated upon. |
| | | */ |
| | | public CompositeDBCursor(Collection<DBCursor<UpdateMsg>> cursors) |
| | | public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors) |
| | | { |
| | | for (DBCursor<UpdateMsg> cursor : cursors) |
| | | for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet()) |
| | | { |
| | | add(cursor); |
| | | put(entry); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | // try to recycle empty cursors in case the underlying ReplicaDBs received |
| | | // new changes. Copy the List to avoid ConcurrentModificationExceptions. |
| | | final DBCursor<UpdateMsg>[] copy = |
| | | exhaustedCursors.toArray(new DBCursor[exhaustedCursors.size()]); |
| | | final Map<DBCursor<UpdateMsg>, Data> copy = |
| | | new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors); |
| | | exhaustedCursors.clear(); |
| | | for (DBCursor<UpdateMsg> cursor : copy) |
| | | for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet()) |
| | | { |
| | | cursor.next(); |
| | | add(cursor); |
| | | entry.getKey().next(); |
| | | put(entry); |
| | | } |
| | | } |
| | | |
| | | if (cursors.isEmpty()) |
| | | { |
| | | // no cursors are left with changes. |
| | | currentChange = null; |
| | | currentRecord = null; |
| | | currentData = null; |
| | | return false; |
| | | } |
| | | |
| | | // To keep consistent the cursors' order in the SortedSet, it is necessary |
| | | // to remove and eventually add again a cursor (after moving it forward). |
| | | final DBCursor<UpdateMsg> cursor = cursors.pollFirst(); |
| | | currentChange = cursor.getRecord(); |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.pollFirstEntry(); |
| | | final DBCursor<UpdateMsg> cursor = entry.getKey(); |
| | | currentRecord = cursor.getRecord(); |
| | | currentData = entry.getValue(); |
| | | cursor.next(); |
| | | add(cursor); |
| | | put(entry); |
| | | return true; |
| | | } |
| | | |
| | | private void add(DBCursor<UpdateMsg> cursor) |
| | | private void put(Entry<DBCursor<UpdateMsg>, Data> entry) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = entry.getKey(); |
| | | final Data data = entry.getValue(); |
| | | if (cursor.getRecord() != null) |
| | | { |
| | | this.cursors.add(cursor); |
| | | this.cursors.put(cursor, data); |
| | | } |
| | | else |
| | | { |
| | | this.exhaustedCursors.add(cursor); |
| | | this.exhaustedCursors.put(cursor, data); |
| | | } |
| | | } |
| | | |
| | |
| | | @Override |
| | | public UpdateMsg getRecord() |
| | | { |
| | | return currentChange; |
| | | return currentRecord; |
| | | } |
| | | |
| | | /** |
| | | * Returns the data associated to the cursor that returned the current record. |
| | | * |
| | | * @return the data associated to the cursor that returned the current record. |
| | | */ |
| | | public Data getData() |
| | | { |
| | | return currentData; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | StaticUtils.close(cursors); |
| | | StaticUtils.close(exhaustedCursors); |
| | | StaticUtils.close(cursors.keySet()); |
| | | StaticUtils.close(exhaustedCursors.keySet()); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + " currentChange=" + currentChange |
| | | + " open cursors=" + cursors + " exhausted cursors=" + exhaustedCursors; |
| | | return getClass().getSimpleName() + " currentRecord=" + currentRecord |
| | | + " currentData=" + currentData + " openCursors=" + cursors |
| | | + " exhaustedCursors=" + exhaustedCursors; |
| | | } |
| | | |
| | | } |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + " currentChange=" + currentChange + "" |
| | | + replicaDB; |
| | | return getClass().getSimpleName() + " currentChange=" + currentChange |
| | | + " replicaDB=" + replicaDB; |
| | | } |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2013 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.lang.Thread.State; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | |
| | | import org.mockito.ArgumentCaptor; |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.Pair; |
| | | import org.testng.annotations.AfterClass; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.BeforeMethod; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.mockito.Matchers.*; |
| | | import static org.mockito.Mockito.*; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | | public class ChangeNumberIndexerTest extends DirectoryServerTestCase |
| | | { |
| | | |
| | | private static final class ReplicatedUpdateMsg extends UpdateMsg |
| | | { |
| | | |
| | | private final DN baseDN; |
| | | private final boolean emptyCursor; |
| | | |
| | | public ReplicatedUpdateMsg(DN baseDN, CSN csn) |
| | | { |
| | | this(baseDN, csn, false); |
| | | } |
| | | |
| | | public ReplicatedUpdateMsg(DN baseDN, CSN csn, boolean emptyCursor) |
| | | { |
| | | super(csn, null); |
| | | this.baseDN = baseDN; |
| | | this.emptyCursor = emptyCursor; |
| | | } |
| | | |
| | | public DN getBaseDN() |
| | | { |
| | | return baseDN; |
| | | } |
| | | |
| | | public boolean isEmptyCursor() |
| | | { |
| | | return emptyCursor; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "csn=" + getCSN() + ", baseDN=" + baseDN; |
| | | } |
| | | } |
| | | |
| | | private static DN BASE_DN; |
| | | private static final int serverId1 = 101; |
| | | private static final int serverId2 = 102; |
| | | |
| | | private ChangelogDB changelogDB; |
| | | private ChangeNumberIndexDB cnIndexDB; |
| | | private ReplicationDomainDB domainDB; |
| | | private Map<Pair<DN, Integer>, SequentialDBCursor> cursors = |
| | | new HashMap<Pair<DN, Integer>, SequentialDBCursor>(); |
| | | private ChangelogState initialState; |
| | | private ChangeNumberIndexer indexer; |
| | | private MultiDomainServerState previousCookie; |
| | | |
| | | @BeforeClass |
| | | public static void classSetup() throws Exception |
| | | { |
| | | TestCaseUtils.startFakeServer(); |
| | | BASE_DN = DN.decode("dc=example,dc=com"); |
| | | } |
| | | |
| | | @AfterClass |
| | | public static void classTearDown() throws Exception |
| | | { |
| | | TestCaseUtils.shutdownFakeServer(); |
| | | } |
| | | |
| | | @BeforeMethod |
| | | public void setup() throws Exception |
| | | { |
| | | changelogDB = mock(ChangelogDB.class); |
| | | cnIndexDB = mock(ChangeNumberIndexDB.class); |
| | | domainDB = mock(ReplicationDomainDB.class); |
| | | when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB); |
| | | when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB); |
| | | |
| | | initialState = new ChangelogState(); |
| | | previousCookie = new MultiDomainServerState(); |
| | | } |
| | | |
| | | |
| | | private static final String EMPTY_DB_NO_DS = "emptyDBNoDS"; |
| | | |
| | | @Test |
| | | public void emptyDBNoDS() throws Exception |
| | | { |
| | | startIndexer(); |
| | | verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class)); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBOneInitialDS() throws Exception |
| | | { |
| | | addReplica(BASE_DN, serverId1); |
| | | startIndexer(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1); |
| | | publishUpdateMsg(msg1); |
| | | |
| | | assertAddedRecords(msg1); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void nonEmptyDBOneInitialDS() throws Exception |
| | | { |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1); |
| | | addReplica(BASE_DN, serverId1); |
| | | setDBInitialRecords(msg1); |
| | | startIndexer(); |
| | | |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId1, 2); |
| | | publishUpdateMsg(msg2); |
| | | |
| | | assertAddedRecords(msg2); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoInitialDSs() throws Exception |
| | | { |
| | | addReplica(BASE_DN, serverId1); |
| | | addReplica(BASE_DN, serverId2); |
| | | startIndexer(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2); |
| | | publishUpdateMsg(msg2, msg1); |
| | | |
| | | assertAddedRecords(msg1, msg2); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void nonEmptyDBTwoInitialDSs() throws Exception |
| | | { |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2); |
| | | addReplica(BASE_DN, serverId1); |
| | | addReplica(BASE_DN, serverId2); |
| | | setDBInitialRecords(msg1, msg2); |
| | | startIndexer(); |
| | | |
| | | final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId2, 3); |
| | | final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4); |
| | | publishUpdateMsg(msg3, msg4); |
| | | |
| | | assertAddedRecords(msg3, msg4); |
| | | } |
| | | |
| | | @Test(enabled = false, dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoCursorsOneEmptyForSomeTime() throws Exception |
| | | { |
| | | // TODO JNR make this tests work |
| | | addReplica(BASE_DN, serverId1); |
| | | addReplica(BASE_DN, serverId2); |
| | | startIndexer(); |
| | | |
| | | final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN, serverId2, 1); |
| | | final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN, serverId2); |
| | | final ReplicatedUpdateMsg msg2Sid1 = msg(BASE_DN, serverId1, 2); |
| | | final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN, serverId2, 3); |
| | | // simulate no messages received during some time for replica 2 |
| | | publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1); |
| | | |
| | | assertAddedRecords(msg1Sid2, msg2Sid1, msg3Sid2); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBOneInitialDSAnotherDSJoining() throws Exception |
| | | { |
| | | addReplica(BASE_DN, serverId1); |
| | | startIndexer(); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1); |
| | | publishUpdateMsg(msg1); |
| | | |
| | | addReplica(BASE_DN, serverId2); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2); |
| | | publishUpdateMsg(msg2); |
| | | |
| | | assertAddedRecords(msg1, msg2); |
| | | } |
| | | |
| | | private void addReplica(DN baseDN, int serverId) throws Exception |
| | | { |
| | | final SequentialDBCursor cursor = new SequentialDBCursor(); |
| | | cursors.put(Pair.of(baseDN, serverId), cursor); |
| | | when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class))) |
| | | .thenReturn(cursor); |
| | | when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(new ServerState()); |
| | | initialState.addServerIdToDomain(serverId, baseDN); |
| | | } |
| | | |
| | | private void startIndexer() |
| | | { |
| | | indexer = new ChangeNumberIndexer(changelogDB, initialState); |
| | | indexer.start(); |
| | | waitForWaitingState(indexer); |
| | | } |
| | | |
| | | private ReplicatedUpdateMsg msg(DN baseDN, int serverId, long time) |
| | | { |
| | | return new ReplicatedUpdateMsg(baseDN, new CSN(time, 0, serverId)); |
| | | } |
| | | |
| | | private ReplicatedUpdateMsg emptyCursor(DN baseDN, int serverId) |
| | | { |
| | | return new ReplicatedUpdateMsg(baseDN, new CSN(0, 0, serverId), true); |
| | | } |
| | | |
| | | private void setDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception |
| | | { |
| | | // Initialize the previous cookie that will be used to compare the records |
| | | // added to the CNIndexDB at the end of this test |
| | | for (int i = 0; i < msgs.length; i++) |
| | | { |
| | | ReplicatedUpdateMsg msg = msgs[i]; |
| | | if (i + 1 == msgs.length) |
| | | { |
| | | final ReplicatedUpdateMsg newestMsg = msg; |
| | | final DN baseDN = newestMsg.getBaseDN(); |
| | | final CSN csn = newestMsg.getCSN(); |
| | | when(cnIndexDB.getNewestRecord()).thenReturn( |
| | | new ChangeNumberIndexRecord(previousCookie.toString(), baseDN, csn)); |
| | | final SequentialDBCursor cursor = |
| | | cursors.get(Pair.of(baseDN, csn.getServerId())); |
| | | cursor.add(newestMsg); |
| | | cursor.next(); // simulate the cursor had been initialized with this change |
| | | } |
| | | previousCookie.update(msg.getBaseDN(), msg.getCSN()); |
| | | } |
| | | } |
| | | |
| | | private void publishUpdateMsg(ReplicatedUpdateMsg... msgs) throws Exception |
| | | { |
| | | for (ReplicatedUpdateMsg msg : msgs) |
| | | { |
| | | final SequentialDBCursor cursor = |
| | | cursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId())); |
| | | if (msg.isEmptyCursor()) |
| | | { |
| | | cursor.add(null); |
| | | } |
| | | else |
| | | { |
| | | cursor.add(msg); |
| | | } |
| | | } |
| | | for (ReplicatedUpdateMsg msg : msgs) |
| | | { |
| | | if (!msg.isEmptyCursor()) |
| | | { |
| | | indexer.publishUpdateMsg(msg.getBaseDN(), msg); |
| | | } |
| | | } |
| | | waitForWaitingState(indexer); |
| | | } |
| | | |
| | | private void waitForWaitingState(final Thread t) |
| | | { |
| | | State state = t.getState(); |
| | | while (!state.equals(State.WAITING) && !state.equals(State.TERMINATED)) |
| | | { |
| | | Thread.yield(); |
| | | state = t.getState(); |
| | | } |
| | | assertThat(state).isEqualTo(State.WAITING); |
| | | } |
| | | |
| | | private void assertAddedRecords(ReplicatedUpdateMsg... msgs) throws Exception |
| | | { |
| | | final ArgumentCaptor<ChangeNumberIndexRecord> arg = |
| | | ArgumentCaptor.forClass(ChangeNumberIndexRecord.class); |
| | | verify(cnIndexDB, atLeast(0)).addRecord(arg.capture()); |
| | | final List<ChangeNumberIndexRecord> allValues = arg.getAllValues(); |
| | | |
| | | // recheck it was not called more than expected |
| | | assertThat(allValues).hasSameSizeAs(msgs); |
| | | for (int i = 0; i < msgs.length; i++) |
| | | { |
| | | final ReplicatedUpdateMsg msg = msgs[i]; |
| | | final ChangeNumberIndexRecord record = allValues.get(i); |
| | | // check content in order |
| | | String description = "expected: <" + msg + ">, but got: <" + record + ">"; |
| | | assertThat(record.getBaseDN()).as(description).isEqualTo(msg.getBaseDN()); |
| | | assertThat(record.getCSN()).as(description).isEqualTo(msg.getCSN()); |
| | | assertThat(record.getPreviousCookie()).as(description).isEqualTo(previousCookie.toString()); |
| | | previousCookie.update(msg.getBaseDN(), msg.getCSN()); |
| | | } |
| | | } |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.Arrays; |
| | | import java.util.List; |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.util.Pair; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.opends.server.util.Pair.*; |
| | | import static org.testng.Assert.*; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | | @SuppressWarnings({ "javadoc", "unchecked" }) |
| | | public class CompositeDBCursorTest extends DirectoryServerTestCase |
| | | { |
| | | |
| | | private static class MyDBCursor implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | private final List<UpdateMsg> msgs; |
| | | private UpdateMsg current; |
| | | |
| | | public MyDBCursor(UpdateMsg... msgs) |
| | | { |
| | | this.msgs = new ArrayList<UpdateMsg>(Arrays.asList(msgs)); |
| | | next(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public UpdateMsg getRecord() |
| | | { |
| | | return this.current; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() |
| | | { |
| | | if (!this.msgs.isEmpty()) |
| | | { |
| | | this.current = this.msgs.remove(0); |
| | | return true; |
| | | } |
| | | this.current = null; |
| | | return false; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | // nothing to do |
| | | } |
| | | |
| | | } |
| | | |
| | | private UpdateMsg msg1; |
| | | private UpdateMsg msg2; |
| | | private UpdateMsg msg3; |
| | | private UpdateMsg msg4; |
| | | private String baseDN1 = "dc=forgerock,dc=com"; |
| | | private String baseDN2 = "dc=example,dc=com"; |
| | | |
| | | @BeforeClass |
| | | public void setupMsgs() |
| | |
| | | @Test |
| | | public void emptyCursor() throws Exception |
| | | { |
| | | final CompositeDBCursor compCursor = newCompositeDBCursor(new MyDBCursor()); |
| | | final CompositeDBCursor<String> compCursor = |
| | | newCompositeDBCursor(of(new SequentialDBCursor(), baseDN1)); |
| | | assertInOrder(compCursor); |
| | | } |
| | | |
| | | @Test |
| | | public void oneElementCursor() throws Exception |
| | | { |
| | | final CompositeDBCursor compCursor = |
| | | newCompositeDBCursor(new MyDBCursor(msg1)); |
| | | assertInOrder(compCursor, msg1); |
| | | final CompositeDBCursor<String> compCursor = |
| | | newCompositeDBCursor(of(new SequentialDBCursor(msg1), baseDN1)); |
| | | assertInOrder(compCursor, of(msg1, baseDN1)); |
| | | } |
| | | |
| | | @Test |
| | | public void twoElementsCursor() throws Exception |
| | | { |
| | | final CompositeDBCursor compCursor = |
| | | newCompositeDBCursor(new MyDBCursor(msg1, msg2)); |
| | | assertInOrder(compCursor, msg1, msg2); |
| | | final CompositeDBCursor<String> compCursor = |
| | | newCompositeDBCursor(of(new SequentialDBCursor(msg1, msg2), baseDN1)); |
| | | assertInOrder(compCursor, |
| | | of(msg1, baseDN1), |
| | | of(msg2, baseDN1)); |
| | | } |
| | | |
| | | @Test |
| | | public void twoEmptyCursors() throws Exception |
| | | { |
| | | final CompositeDBCursor compCursor = newCompositeDBCursor( |
| | | new MyDBCursor(), |
| | | new MyDBCursor()); |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |
| | | of(new SequentialDBCursor(), baseDN1), |
| | | of(new SequentialDBCursor(), baseDN2)); |
| | | assertInOrder(compCursor); |
| | | } |
| | | |
| | | @Test |
| | | public void twoOneElementCursors() throws Exception |
| | | { |
| | | final CompositeDBCursor compCursor = newCompositeDBCursor( |
| | | new MyDBCursor(msg2), |
| | | new MyDBCursor(msg1)); |
| | | assertInOrder(compCursor, msg1, msg2); |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |
| | | of(new SequentialDBCursor(msg2), baseDN1), |
| | | of(new SequentialDBCursor(msg1), baseDN2)); |
| | | assertInOrder(compCursor, |
| | | of(msg1, baseDN2), |
| | | of(msg2, baseDN1)); |
| | | } |
| | | |
| | | @Test |
| | | public void twoTwoElementCursors() throws Exception |
| | | { |
| | | final CompositeDBCursor compCursor = newCompositeDBCursor( |
| | | new MyDBCursor(msg2, msg3), |
| | | new MyDBCursor(msg1, msg4)); |
| | | assertInOrder(compCursor, msg1, msg2, msg3, msg4); |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |
| | | of(new SequentialDBCursor(msg2, msg3), baseDN1), |
| | | of(new SequentialDBCursor(msg1, msg4), baseDN2)); |
| | | assertInOrder(compCursor, |
| | | of(msg1, baseDN2), |
| | | of(msg2, baseDN1), |
| | | of(msg3, baseDN1), |
| | | of(msg4, baseDN2)); |
| | | } |
| | | |
| | | @Test |
| | | public void recycleTwoElementCursors() throws Exception |
| | | { |
| | | final CompositeDBCursor compCursor = newCompositeDBCursor( |
| | | new MyDBCursor(msg2, null, msg3), |
| | | new MyDBCursor(null, msg1, msg4)); |
| | | assertInOrder(compCursor, msg1, msg2, msg3, msg4); |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |
| | | of(new SequentialDBCursor(msg2, null, msg3), baseDN1), |
| | | of(new SequentialDBCursor(null, msg1, msg4), baseDN2)); |
| | | assertInOrder(compCursor, |
| | | of(msg1, baseDN2), |
| | | of(msg2, baseDN1), |
| | | of(msg3, baseDN1), |
| | | of(msg4, baseDN2)); |
| | | } |
| | | |
| | | @Test |
| | | public void recycleTwoElementCursorsTODOJNR() throws Exception |
| | | { |
| | | SequentialDBCursor cursor1 = new SequentialDBCursor(msg2, null, msg3); |
| | | SequentialDBCursor cursor2 = new SequentialDBCursor(null, msg1, msg4); |
| | | cursor1.next(); |
| | | cursor2.next(); |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |
| | | of(cursor1, baseDN1), |
| | | of(cursor2, baseDN2)); |
| | | assertInOrder(compCursor, |
| | | of(msg1, baseDN2), |
| | | of(msg3, baseDN1), |
| | | of(msg4, baseDN2)); |
| | | } |
| | | |
| | | private UpdateMsg newUpdateMsg(int t) |
| | |
| | | return new UpdateMsg(new CSN(t, t, t), new byte[t]); |
| | | } |
| | | |
| | | private CompositeDBCursor newCompositeDBCursor(DBCursor<UpdateMsg>... cursors) |
| | | private CompositeDBCursor<String> newCompositeDBCursor( |
| | | Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception |
| | | { |
| | | return new CompositeDBCursor(Arrays.asList(cursors)); |
| | | final Map<DBCursor<UpdateMsg>, String> cursorsMap = |
| | | new HashMap<DBCursor<UpdateMsg>, String>(); |
| | | for (Pair<? extends DBCursor<UpdateMsg>, String> pair : pairs) |
| | | { |
| | | cursorsMap.put(pair.getFirst(), pair.getSecond()); |
| | | } |
| | | return new CompositeDBCursor<String>(cursorsMap); |
| | | } |
| | | |
| | | private void assertInOrder(final CompositeDBCursor compCursor, |
| | | UpdateMsg... msgs) throws ChangelogException |
| | | private void assertInOrder(final CompositeDBCursor<String> compCursor, |
| | | Pair<UpdateMsg, String>... expecteds) throws ChangelogException |
| | | { |
| | | for (UpdateMsg msg : msgs) |
| | | for (final Pair<UpdateMsg, String> expected : expecteds) |
| | | { |
| | | assertTrue(compCursor.next()); |
| | | assertSame(compCursor.getRecord(), msg); |
| | | assertSame(compCursor.getRecord(), expected.getFirst()); |
| | | assertSame(compCursor.getData(), expected.getSecond()); |
| | | } |
| | | assertFalse(compCursor.next()); |
| | | compCursor.close(); |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2013 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.Arrays; |
| | | import java.util.List; |
| | | |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | | class SequentialDBCursor implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | private final List<UpdateMsg> msgs; |
| | | private UpdateMsg current; |
| | | |
| | | public SequentialDBCursor(UpdateMsg... msgs) |
| | | { |
| | | this.msgs = new ArrayList<UpdateMsg>(Arrays.asList(msgs)); |
| | | next(); |
| | | } |
| | | |
| | | public void add(UpdateMsg msg) |
| | | { |
| | | this.msgs.add(msg); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public UpdateMsg getRecord() |
| | | { |
| | | return current; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() |
| | | { |
| | | if (!msgs.isEmpty()) |
| | | { |
| | | current = msgs.remove(0); |
| | | return current != null; |
| | | } |
| | | current = null; |
| | | return false; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | // nothing to do |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "currentRecord=" + current + " nextMessages=" + msgs; |
| | | } |
| | | |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.types; |
| | | |
| | | import java.util.Collections; |
| | | import java.util.Arrays; |
| | | |
| | | import org.opends.server.api.AttributeSyntax; |
| | | import org.opends.server.schema.OIDSyntax; |
| | | |
| | | /** |
| | |
| | | public interface AttributeTypeConstants |
| | | { |
| | | |
| | | AttributeSyntax<?> OID_SYNTAX = new OIDSyntax(); |
| | | |
| | | AttributeType OBJECT_CLASS = new AttributeType( |
| | | "( 2.5.4.0 NAME 'objectClass' EQUALITY objectIdentifierMatch " |
| | | + "SYNTAX 1.3.6.1.4.1.1466.115.121.1.38 X-ORIGIN 'RFC 2256' )", |
| | | "objectClass", Collections.singletonList("objectClass"), "2.5.4.0", null, |
| | | null, new OIDSyntax(), AttributeUsage.USER_APPLICATIONS, false, false, |
| | | false, false); |
| | | "objectClass", Arrays.asList("objectClass"), "2.5.4.0", |
| | | null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS, |
| | | false, false, false, false); |
| | | |
| | | AttributeType[] ALL = { OBJECT_CLASS, }; |
| | | AttributeType ORGANIZATION_NAME = new AttributeType( |
| | | "( 2.5.4.10 NAME ( 'o' 'organizationName' ) SUP name X-ORIGIN 'RFC 4519' )", |
| | | "organizationName", Arrays.asList("o", "organizationName"), "2.5.4.10", |
| | | null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS, |
| | | false, false, false, false); |
| | | |
| | | AttributeType ORGANIZATIONAL_UNIT_NAME = new AttributeType( |
| | | "( 2.5.4.11 NAME ( 'ou' 'organizationalUnitName' ) SUP name X-ORIGIN 'RFC 4519' )", |
| | | "organizationalUnitName", Arrays.asList("ou", "organizationalUnitName"), "2.5.4.11", |
| | | null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS, |
| | | false, false, false, false); |
| | | |
| | | AttributeType DOMAIN_COMPONENT = new AttributeType( |
| | | "( 0.9.2342.19200300.100.1.25 NAME ( 'dc' 'domainComponent' ) " |
| | | + "EQUALITY caseIgnoreIA5Match SUBSTR caseIgnoreIA5SubstringsMatch" |
| | | + "SYNTAX 1.3.6.1.4.1.1466.115.121.1.26 SINGLE-VALUE X-ORIGIN 'RFC 4519' )", |
| | | "domainComponent", Arrays.asList("dc", "domainComponent"), "0.9.2342.19200300.100.1.25", |
| | | null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS, |
| | | false, false, false, false); |
| | | |
| | | AttributeType[] ALL = { OBJECT_CLASS, ORGANIZATION_NAME, |
| | | ORGANIZATIONAL_UNIT_NAME, DOMAIN_COMPONENT, }; |
| | | |
| | | } |