| opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java | ●●●●● patch | view | raw | blame | history | |
| opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | ●●●●● patch | view | raw | blame | history |
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
New file @@ -0,0 +1,144 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import java.util.*; import org.opends.server.replication.common.CSN; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.util.StaticUtils; /** * {@link DBCursor} implementation that iterates across a Collection of * {@link DBCursor}s, advancing from the oldest to the newest change cross all * cursors. */ final class CompositeDBCursor implements DBCursor<UpdateMsg> { private UpdateMsg currentChange; private final List<DBCursor<UpdateMsg>> exhaustedCursors = new ArrayList<DBCursor<UpdateMsg>>(); /** * The cursors are sorted based on the current change of each cursor to * consider the next change across all available cursors. */ private final NavigableSet<DBCursor<UpdateMsg>> cursors = new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>() { @Override public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2) { final CSN csn1 = o1.getRecord().getCSN(); final CSN csn2 = o2.getRecord().getCSN(); return CSN.compare(csn1, csn2); } }); /** * Builds a CompositeDBCursor using the provided collection of cursors. * * @param cursors * the cursors that will be iterated upon. */ public CompositeDBCursor(Collection<DBCursor<UpdateMsg>> cursors) { for (DBCursor<UpdateMsg> cursor : cursors) { add(cursor); } } /** {@inheritDoc} */ @Override public boolean next() throws ChangelogException { // try to recycle empty cursors in case the underlying ReplicaDBs received // new changes for (Iterator<DBCursor<UpdateMsg>> iter = exhaustedCursors.iterator(); iter .hasNext();) { DBCursor<UpdateMsg> cursor = iter.next(); iter.remove(); cursor.next(); add(cursor); } if (cursors.isEmpty()) { // no cursors are left with changes. currentChange = null; return false; } // To keep consistent the cursors' order in the SortedSet, it is necessary // to remove and eventually add again a cursor (after moving it forward). final DBCursor<UpdateMsg> cursor = cursors.pollFirst(); currentChange = cursor.getRecord(); cursor.next(); add(cursor); return true; } private void add(DBCursor<UpdateMsg> cursor) { if (cursor.getRecord() != null) { this.cursors.add(cursor); } else { this.exhaustedCursors.add(cursor); } } /** {@inheritDoc} */ @Override public UpdateMsg getRecord() { return currentChange; } /** {@inheritDoc} */ @Override public void close() { StaticUtils.close(cursors); StaticUtils.close(exhaustedCursors); } /** {@inheritDoc} */ @Override public String toString() { return getClass().getSimpleName() + " currentChange=" + currentChange + " open cursors=" + cursors + " exhausted cursors=" + exhaustedCursors; } } opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -61,110 +61,6 @@ protected static final DebugTracer TRACER = getTracer(); /** * {@link DBCursor} implementation that iterates across all the ReplicaDBs of * a replication domain, advancing from the oldest to the newest change cross * all replicaDBs. */ private final class CrossReplicaDBCursor implements DBCursor<UpdateMsg> { private final DN baseDN; private UpdateMsg currentChange; /** * The cursors are sorted based on the current change of each cursor to * consider the next change across all replicaDBs. */ private final NavigableSet<DBCursor<UpdateMsg>> cursors = new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>() { @Override public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2) { final CSN csn1 = o1.getRecord().getCSN(); final CSN csn2 = o2.getRecord().getCSN(); return CSN.compare(csn1, csn2); } }); public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState) throws ChangelogException { this.baseDN = baseDN; for (int serverId : getDomainMap(baseDN).keySet()) { // get the last already sent CSN from that server to get a cursor final CSN lastCSN = startAfterServerState.getCSN(serverId); addCursorIfNotEmpty(getCursorFrom(baseDN, serverId, lastCSN)); } } private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN) throws ChangelogException { JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); if (replicaDB != null) { DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN); cursor.next(); return cursor; } return EMPTY_CURSOR; } @Override public boolean next() throws ChangelogException { if (cursors.isEmpty()) { currentChange = null; return false; } // To keep consistent the cursors' order in the SortedSet, it is necessary // to remove and eventually add again a cursor (after moving it forward). final DBCursor<UpdateMsg> cursor = cursors.pollFirst(); currentChange = cursor.getRecord(); cursor.next(); addCursorIfNotEmpty(cursor); return true; } void addCursorIfNotEmpty(DBCursor<UpdateMsg> cursor) { if (cursor.getRecord() != null) { cursors.add(cursor); } else { StaticUtils.close(cursor); } } @Override public UpdateMsg getRecord() { return currentChange; } @Override public void close() { StaticUtils.close(cursors); } /** {@inheritDoc} */ @Override public String toString() { return getClass().getSimpleName() + " baseDN=" + baseDN + " currentChange=" + currentChange + " open cursors=" + cursors; } } /** * This map contains the List of updates received from each LDAP server. * <p> * When removing a domainMap, code: @@ -790,7 +686,29 @@ public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterServerState) throws ChangelogException { return new CrossReplicaDBCursor(baseDN, startAfterServerState); final Set<Integer> serverIds = getDomainMap(baseDN).keySet(); final List<DBCursor<UpdateMsg>> cursors = new ArrayList<DBCursor<UpdateMsg>>(serverIds.size()); for (int serverId : serverIds) { // get the last already sent CSN from that server to get a cursor final CSN lastCSN = startAfterServerState.getCSN(serverId); cursors.add(getCursorFrom(baseDN, serverId, lastCSN)); } return new CompositeDBCursor(cursors); } private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN) throws ChangelogException { JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); if (replicaDB != null) { DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN); cursor.next(); return cursor; } return EMPTY_CURSOR; } private ServerState buildServerState(DN baseDN, CSN startAfterCSN)