OPENDJ-1559 (CR-4758) Create ReplicaId class to replace use of Pair<DN, Integer>
Added ReplicaID class + replaced all uses of Pair<DN, Integer> with ReplicaID.
1 files added
5 files modified
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicaId; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate; |
| | | import org.opends.server.replication.server.changelog.je.ECLMultiDomainDBCursor; |
| | |
| | | import org.opends.server.types.WritabilityMode; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | import static org.opends.messages.BackendMessages.*; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.config.ConfigConstants.*; |
| | |
| | | private final SearchPhase startPhase; |
| | | private final Set<DN> excludedBaseDNs; |
| | | private final MultiDomainServerState cookie; |
| | | private final ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>> replicaIdToSendEntryData = |
| | | new ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>>(Pair.COMPARATOR); |
| | | private final ConcurrentSkipListMap<ReplicaId, SendEntryData<CSN>> replicaIdToSendEntryData = |
| | | new ConcurrentSkipListMap<ReplicaId, SendEntryData<CSN>>(); |
| | | |
| | | private CookieEntrySender(SearchOperation searchOp, SearchPhase startPhase, MultiDomainServerState cookie, |
| | | Set<DN> excludedBaseDNs) |
| | |
| | | |
| | | private SendEntryData<CSN> getSendEntryData(DN baseDN, CSN csn) |
| | | { |
| | | final Pair<DN, Integer> replicaId = Pair.of(baseDN, csn.getServerId()); |
| | | final ReplicaId replicaId = ReplicaId.of(baseDN, csn.getServerId()); |
| | | SendEntryData<CSN> data = replicaIdToSendEntryData.get(replicaId); |
| | | if (data == null) |
| | | { |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.api; |
| | | |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | | * Replica identifier comprised of the domain baseDN and its serverId within |
| | | * this domain. |
| | | */ |
| | | public final class ReplicaId implements Comparable<ReplicaId> |
| | | { |
| | | |
| | | private final DN baseDN; |
| | | private final int serverId; |
| | | |
| | | /** |
| | | * Creates a ReplicaId with the provided parameters. |
| | | * |
| | | * @param baseDN |
| | | * domain baseDN, cannot be null |
| | | * @param serverId |
| | | * serverId within the domain |
| | | */ |
| | | private ReplicaId(DN baseDN, int serverId) |
| | | { |
| | | this.baseDN = baseDN; |
| | | this.serverId = serverId; |
| | | } |
| | | |
| | | /** |
| | | * Creates a ReplicaId with the provided parameters. |
| | | * |
| | | * @param baseDN |
| | | * domain baseDN |
| | | * @param serverId |
| | | * serverId within the domain |
| | | * @return a new ReplicaId |
| | | */ |
| | | public static ReplicaId of(DN baseDN, int serverId) |
| | | { |
| | | return new ReplicaId(baseDN, serverId); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int compareTo(ReplicaId o) |
| | | { |
| | | final int compareResult = baseDN.compareTo(o.baseDN); |
| | | if (compareResult == 0) |
| | | { |
| | | return serverId - o.serverId; |
| | | } |
| | | return compareResult; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int hashCode() |
| | | { |
| | | final int prime = 31; |
| | | int result = 1; |
| | | result = prime * result + ((baseDN == null) ? 0 : baseDN.hashCode()); |
| | | return prime * result + serverId; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean equals(Object obj) |
| | | { |
| | | if (this == obj) |
| | | { |
| | | return true; |
| | | } |
| | | if (obj instanceof ReplicaId) |
| | | { |
| | | return false; |
| | | } |
| | | final ReplicaId other = (ReplicaId) obj; |
| | | return serverId == other.serverId && baseDN.equals(other.baseDN); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + "(" + baseDN + " " + serverId + ")"; |
| | | } |
| | | } |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicaId; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | |
| | | new HashMap<DN, List<DomainDBCursor>>(); |
| | | private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = |
| | | new CopyOnWriteArrayList<MultiDomainDBCursor>(); |
| | | private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors = |
| | | new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR); |
| | | private final ConcurrentSkipListMap<ReplicaId, List<ReplicaCursor>> replicaCursors = |
| | | new ConcurrentSkipListMap<ReplicaId, List<ReplicaCursor>>(); |
| | | private ReplicationEnvironment replicationEnv; |
| | | private final ReplicationServerCfg config; |
| | | private final File dbDirectory; |
| | |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); |
| | | final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId); |
| | | final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this); |
| | | final ReplicaId replicaId = ReplicaId.of(baseDN, serverId); |
| | | final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this); |
| | | |
| | | synchronized (replicaCursors) |
| | | { |
| | | List<ReplicaCursor> cursors = replicaCursors.get(replicaID); |
| | | List<ReplicaCursor> cursors = replicaCursors.get(replicaId); |
| | | if (cursors == null) |
| | | { |
| | | cursors = new ArrayList<ReplicaCursor>(); |
| | | replicaCursors.put(replicaID, cursors); |
| | | replicaCursors.put(replicaId, cursors); |
| | | } |
| | | cursors.add(replicaCursor); |
| | | } |
| | |
| | | final ReplicaCursor replicaCursor = (ReplicaCursor) cursor; |
| | | synchronized (replicaCursors) |
| | | { |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID()); |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaId()); |
| | | if (cursors != null) |
| | | { |
| | | cursors.remove(cursor); |
| | |
| | | { |
| | | synchronized (replicaCursors) |
| | | { |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId)); |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(ReplicaId.of(baseDN, serverId)); |
| | | if (cursors != null) |
| | | { |
| | | for (ReplicaCursor cursor : cursors) |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicaId; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | |
| | | new HashMap<DN, List<DomainDBCursor>>(); |
| | | private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = |
| | | new CopyOnWriteArrayList<MultiDomainDBCursor>(); |
| | | private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors = |
| | | new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR); |
| | | private final ConcurrentSkipListMap<ReplicaId, List<ReplicaCursor>> replicaCursors = |
| | | new ConcurrentSkipListMap<ReplicaId, List<ReplicaCursor>>(); |
| | | private ReplicationDbEnv replicationEnv; |
| | | private final ReplicationServerCfg config; |
| | | private final File dbDirectory; |
| | |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); |
| | | final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId); |
| | | final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this); |
| | | final ReplicaId replicaId = ReplicaId.of(baseDN, serverId); |
| | | final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this); |
| | | |
| | | synchronized (replicaCursors) |
| | | { |
| | | List<ReplicaCursor> cursors = replicaCursors.get(replicaID); |
| | | List<ReplicaCursor> cursors = replicaCursors.get(replicaId); |
| | | if (cursors == null) |
| | | { |
| | | cursors = new ArrayList<ReplicaCursor>(); |
| | | replicaCursors.put(replicaID, cursors); |
| | | replicaCursors.put(replicaId, cursors); |
| | | } |
| | | cursors.add(replicaCursor); |
| | | } |
| | |
| | | else if (cursor instanceof ReplicaCursor) |
| | | { |
| | | final ReplicaCursor replicaCursor = (ReplicaCursor) cursor; |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID()); |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaId()); |
| | | if (cursors != null) |
| | | { |
| | | cursors.remove(cursor); |
| | |
| | | |
| | | private void updateCursorsWithOfflineCSN(final DN baseDN, int serverId, final CSN offlineCSN) |
| | | { |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId)); |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(ReplicaId.of(baseDN, serverId)); |
| | | if (cursors != null && !cursors.isEmpty()) |
| | | { |
| | | for (ReplicaCursor cursor : cursors) |
| | |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicaId; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.DN; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | /** |
| | | * {@link DBCursor} over a replica returning {@link UpdateMsg}s. |
| | |
| | | new AtomicReference<ReplicaOfflineMsg>(); |
| | | private UpdateMsg currentRecord; |
| | | |
| | | private final Pair<DN, Integer> replicaID; |
| | | private final ReplicaId replicaId; |
| | | private final ReplicationDomainDB domainDB; |
| | | |
| | | /** |
| | |
| | | * @param offlineCSN |
| | | * the offline CSN from which to builder the |
| | | * {@link ReplicaOfflineMsg} to return |
| | | * @param replicaID |
| | | * the baseDN => serverId pair to uniquely identify the replica |
| | | * @param replicaId |
| | | * the replica identifier |
| | | * @param domainDB |
| | | * the DB for the provided replication domain |
| | | */ |
| | | public ReplicaCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN, |
| | | Pair<DN, Integer> replicaID, ReplicationDomainDB domainDB) |
| | | public ReplicaCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN, ReplicaId replicaId, ReplicationDomainDB domainDB) |
| | | { |
| | | this.cursor = cursor; |
| | | this.replicaID = replicaID; |
| | | this.replicaId = replicaId; |
| | | this.domainDB = domainDB; |
| | | setOfflineCSN(offlineCSN); |
| | | } |
| | |
| | | * |
| | | * @return the replica identifier that this cursor is associated to |
| | | */ |
| | | public Pair<DN, Integer> getReplicaID() |
| | | public ReplicaId getReplicaId() |
| | | { |
| | | return replicaID; |
| | | return replicaId; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.ReplicaId; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.DN; |
| | | import org.testng.annotations.*; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | import org.testng.annotations.AfterClass; |
| | | import org.testng.annotations.AfterMethod; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.BeforeMethod; |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.mockito.Matchers.*; |
| | |
| | | |
| | | private List<DN> eclEnabledDomains; |
| | | private MultiDomainDBCursor multiDomainCursor; |
| | | private Map<Pair<DN, Integer>, SequentialDBCursor> replicaDBCursors; |
| | | private Map<ReplicaId, SequentialDBCursor> replicaDBCursors; |
| | | private Map<DN, DomainDBCursor> domainDBCursors; |
| | | private ChangelogState initialState; |
| | | private Map<DN, ServerState> domainNewestCSNs; |
| | |
| | | |
| | | multiDomainCursor = new MultiDomainDBCursor(domainDB, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | initialState = new ChangelogState(); |
| | | replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>(); |
| | | replicaDBCursors = new HashMap<ReplicaId, SequentialDBCursor>(); |
| | | domainDBCursors = new HashMap<DN, DomainDBCursor>(); |
| | | domainNewestCSNs = new HashMap<DN, ServerState>(); |
| | | |
| | |
| | | private void addReplica(DN baseDN, int serverId) throws Exception |
| | | { |
| | | final SequentialDBCursor replicaDBCursor = new SequentialDBCursor(); |
| | | replicaDBCursors.put(Pair.of(baseDN, serverId), replicaDBCursor); |
| | | replicaDBCursors.put(ReplicaId.of(baseDN, serverId), replicaDBCursor); |
| | | |
| | | if (predicate.isECLEnabledDomain(baseDN)) |
| | | { |
| | |
| | | for (ReplicatedUpdateMsg msg : msgs) |
| | | { |
| | | final SequentialDBCursor cursor = |
| | | replicaDBCursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId())); |
| | | replicaDBCursors.get(ReplicaId.of(msg.getBaseDN(), msg.getCSN().getServerId())); |
| | | if (msg.isEmptyCursor()) |
| | | { |
| | | cursor.add(null); |