| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2013 ForgeRock AS |
| | | * Copyright 2013-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | |
| | | for (Integer serverId : entry.getValue()) |
| | | { |
| | | final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId); |
| | | ensureCursorExists(baseDN, serverId, csn, false); |
| | | // start after the actual CSN when initializing from the previous cookie |
| | | ensureCursorExists(baseDN, serverId, csn); |
| | | } |
| | | |
| | | ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); |
| | |
| | | nextChangeForInsertDBCursor = result; |
| | | } |
| | | |
| | | private boolean ensureCursorExists(DN baseDN, Integer serverId, CSN csn, |
| | | boolean startFromPrecedingCSN) throws ChangelogException |
| | | private boolean ensureCursorExists(DN baseDN, Integer serverId, |
| | | CSN startAfterCSN) throws ChangelogException |
| | | { |
| | | Map<Integer, DBCursor<UpdateMsg>> map = allCursors.get(baseDN); |
| | | if (map == null) |
| | |
| | | if (cursor == null) |
| | | { |
| | | final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); |
| | | // start from preceding CSN for publishUpdateMsg(), |
| | | // or from the actual CSN when initializing from the previous cookie |
| | | final CSN startAfterCSN = |
| | | startFromPrecedingCSN ? getPrecedingCSN(csn) : csn; |
| | | cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN); |
| | | map.put(serverId, cursor); |
| | | return false; |
| | |
| | | |
| | | /** |
| | | * Returns the immediately preceding CSN. |
| | | * |
| | | * @param csn |
| | | * the CSN to use |
| | | * @return the immediately preceding CSN or null if the provided CSN is null. |
| | | */ |
| | | private CSN getPrecedingCSN(CSN csn) |
| | | CSN getPrecedingCSN(CSN csn) |
| | | { |
| | | if (csn == null) |
| | | { |
| | |
| | | final Entry<Pair<DN, Integer>, CSN> entry = iter.next(); |
| | | final DN baseDN = entry.getKey().getFirst(); |
| | | final CSN csn = entry.getValue(); |
| | | if (!ensureCursorExists(baseDN, csn.getServerId(), csn, true)) |
| | | // start after preceding CSN so the first CSN read will exactly be the |
| | | // current one |
| | | final CSN startFromCSN = getPrecedingCSN(csn); |
| | | if (!ensureCursorExists(baseDN, csn.getServerId(), startFromCSN)) |
| | | { |
| | | newCursorAdded = true; |
| | | } |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2013 ForgeRock AS |
| | | * Copyright 2013-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | |
| | | assertExternalChangelogContent(msg1, msg2, msg4); |
| | | } |
| | | |
| | | |
| | | private void addReplica(DN baseDN, int serverId) throws Exception |
| | | { |
| | | final SequentialDBCursor cursor = new SequentialDBCursor(); |
| | |
| | | |
| | | private void stopCNIndexer() |
| | | { |
| | | cnIndexer.initiateShutdown(); |
| | | if (cnIndexer != null) |
| | | { |
| | | cnIndexer.initiateShutdown(); |
| | | } |
| | | } |
| | | |
| | | private ReplicatedUpdateMsg msg(DN baseDN, int serverId, long time) |
| | |
| | | previousCookie.update(msg.getBaseDN(), msg.getCSN()); |
| | | } |
| | | } |
| | | |
| | | @DataProvider |
| | | public Object[][] precedingCSNData() |
| | | { |
| | | final int serverId = 42; |
| | | final int t = 1000; |
| | | return new Object[][] { |
| | | // @formatter:off |
| | | { null, null, }, |
| | | { new CSN(t, 1, serverId), new CSN(t, 0, serverId), }, |
| | | { new CSN(t, 0, serverId), new CSN(t - 1, Integer.MAX_VALUE, serverId), }, |
| | | // @formatter:on |
| | | }; |
| | | } |
| | | |
| | | @Test(dataProvider = "precedingCSNData") |
| | | public void getPrecedingCSN(CSN start, CSN expected) |
| | | { |
| | | CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start); |
| | | assertThat(precedingCSN).isEqualTo(expected); |
| | | } |
| | | } |