| | |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.io.File; |
| | | import java.util.Collections; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | |
| | | import org.opends.messages.Message; |
| | |
| | | { |
| | | |
| | | /** |
| | | * ReplicaDBCursor implementation that iterates across all the ReplicaDBs of a |
| | | * replication domain, advancing from the oldest to the newest change cross |
| | | * all replicaDBs. |
| | | */ |
| | | private final class CrossReplicaDBCursor implements ReplicaDBCursor |
| | | { |
| | | |
| | | private UpdateMsg currentChange; |
| | | /** |
| | | * The cursors are sorted based on the current change of each cursor to |
| | | * consider the next change across all replicaDBs. |
| | | */ |
| | | private final NavigableSet<ReplicaDBCursor> cursors = |
| | | new TreeSet<ReplicaDBCursor>(); |
| | | private final DN baseDN; |
| | | |
| | | public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState) |
| | | { |
| | | this.baseDN = baseDN; |
| | | for (int serverId : getDomainMap(baseDN).keySet()) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterServerState.getCSN(serverId); |
| | | addCursorIfNotEmpty(getCursorFrom(baseDN, serverId, lastCSN)); |
| | | } |
| | | } |
| | | |
| | | private ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, |
| | | CSN startAfterCSN) |
| | | { |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | try |
| | | { |
| | | ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN); |
| | | cursor.next(); |
| | | return cursor; |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | // ignored |
| | | } |
| | | } |
| | | return EMPTY_CURSOR; |
| | | } |
| | | |
| | | @Override |
| | | public boolean next() |
| | | { |
| | | if (cursors.isEmpty()) |
| | | { |
| | | currentChange = null; |
| | | return false; |
| | | } |
| | | |
| | | // To keep consistent the cursors' order in the SortedSet, it is necessary |
| | | // to remove and eventually add again a cursor (after moving it forward). |
| | | final ReplicaDBCursor cursor = cursors.pollFirst(); |
| | | currentChange = cursor.getChange(); |
| | | cursor.next(); |
| | | addCursorIfNotEmpty(cursor); |
| | | return true; |
| | | } |
| | | |
| | | void addCursorIfNotEmpty(ReplicaDBCursor cursor) |
| | | { |
| | | if (cursor.getChange() != null) |
| | | { |
| | | cursors.add(cursor); |
| | | } |
| | | else |
| | | { |
| | | StaticUtils.close(cursor); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public UpdateMsg getChange() |
| | | { |
| | | return currentChange; |
| | | } |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | StaticUtils.close(cursors); |
| | | } |
| | | |
| | | @Override |
| | | public int compareTo(ReplicaDBCursor o) |
| | | { |
| | | final CSN csn1 = getChange().getCSN(); |
| | | final CSN csn2 = o.getChange().getCSN(); |
| | | |
| | | return CSN.compare(csn1, csn2); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + " baseDN=" + baseDN |
| | | + " currentChange=" + currentChange + " open cursors=" + cursors; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This map contains the List of updates received from each LDAP server. |
| | | */ |
| | | private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs = |
| | |
| | | { |
| | | // empty |
| | | } |
| | | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "EmptyReplicaDBCursor"; |
| | | } |
| | | }; |
| | | |
| | | /** |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Set<Integer> getDomainServerIds(DN baseDN) |
| | | { |
| | | return Collections.unmodifiableSet(getDomainMap(baseDN).keySet()); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getCount(DN baseDN, int serverId, CSN from, CSN to) |
| | | { |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, |
| | | CSN startAfterCSN) |
| | | public ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN) |
| | | { |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | // Builds a new serverState for all the serverIds in the replication domain |
| | | // to ensure we get cursors starting after the provided CSN. |
| | | return getCursorFrom(baseDN, buildServerState(baseDN, startAfterCSN)); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ReplicaDBCursor getCursorFrom(DN baseDN, |
| | | ServerState startAfterServerState) |
| | | { |
| | | return new CrossReplicaDBCursor(baseDN, startAfterServerState); |
| | | } |
| | | |
| | | private ServerState buildServerState(DN baseDN, CSN startAfterCSN) |
| | | { |
| | | final ServerState result = new ServerState(); |
| | | if (startAfterCSN == null) |
| | | { |
| | | try |
| | | return result; |
| | | } |
| | | |
| | | for (int serverId : getDomainMap(baseDN).keySet()) |
| | | { |
| | | if (serverId == startAfterCSN.getServerId()) |
| | | { |
| | | ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN); |
| | | cursor.next(); |
| | | return cursor; |
| | | // reuse the provided CSN one as it is the most accurate |
| | | result.update(startAfterCSN); |
| | | } |
| | | catch (ChangelogException e) |
| | | else |
| | | { |
| | | // ignored |
| | | // build a new CSN, ignoring the seqNum since it is irrelevant for |
| | | // a different serverId |
| | | final CSN csn = startAfterCSN; // only used for increased readability |
| | | result.update(new CSN(csn.getTime(), 0, serverId)); |
| | | } |
| | | } |
| | | return EMPTY_CURSOR; |
| | | return result; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |