| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2013 ForgeRock AS |
| | | * Copyright 2013-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | |
| | | for (Integer serverId : entry.getValue()) |
| | | { |
| | | final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId); |
| | | ensureCursorExists(baseDN, serverId, csn, false); |
| | | // start after the actual CSN when initializing from the previous cookie |
| | | ensureCursorExists(baseDN, serverId, csn); |
| | | } |
| | | |
| | | ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); |
| | |
| | | nextChangeForInsertDBCursor = result; |
| | | } |
| | | |
| | | private boolean ensureCursorExists(DN baseDN, Integer serverId, CSN csn, |
| | | boolean startFromPrecedingCSN) throws ChangelogException |
| | | private boolean ensureCursorExists(DN baseDN, Integer serverId, |
| | | CSN startAfterCSN) throws ChangelogException |
| | | { |
| | | Map<Integer, DBCursor<UpdateMsg>> map = allCursors.get(baseDN); |
| | | if (map == null) |
| | |
| | | if (cursor == null) |
| | | { |
| | | final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); |
| | | // start from preceding CSN for publishUpdateMsg(), |
| | | // or from the actual CSN when initializing from the previous cookie |
| | | final CSN startAfterCSN = |
| | | startFromPrecedingCSN ? getPrecedingCSN(csn) : csn; |
| | | cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN); |
| | | map.put(serverId, cursor); |
| | | return false; |
| | |
| | | |
| | | /** |
| | | * Returns the immediately preceding CSN. |
| | | * |
| | | * @param csn |
| | | * the CSN to use |
| | | * @return the immediately preceding CSN or null if the provided CSN is null. |
| | | */ |
| | | private CSN getPrecedingCSN(CSN csn) |
| | | CSN getPrecedingCSN(CSN csn) |
| | | { |
| | | if (csn == null) |
| | | { |
| | |
| | | final Entry<Pair<DN, Integer>, CSN> entry = iter.next(); |
| | | final DN baseDN = entry.getKey().getFirst(); |
| | | final CSN csn = entry.getValue(); |
| | | if (!ensureCursorExists(baseDN, csn.getServerId(), csn, true)) |
| | | // start after preceding CSN so the first CSN read will exactly be the |
| | | // current one |
| | | final CSN startFromCSN = getPrecedingCSN(csn); |
| | | if (!ensureCursorExists(baseDN, csn.getServerId(), startFromCSN)) |
| | | { |
| | | newCursorAdded = true; |
| | | } |