| | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.util.Collections; |
| | | import java.util.Comparator; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.concurrent.ConcurrentSkipListSet; |
| | | |
| | | import org.opends.messages.Message; |
| | |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.ReplicaOfflineMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | /** |
| | | * Thread responsible for inserting replicated changes into the ChangeNumber |
| | |
| | | private ChangelogState changelogState; |
| | | |
| | | /* |
| | | * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because |
| | | * The following MultiDomainServerState fields must be thread safe, because |
| | | * 1) initialization can happen while the replication server starts receiving |
| | | * updates 2) many updates can happen concurrently. |
| | | */ |
| | |
| | | * |
| | | * @NonNull |
| | | */ |
| | | @SuppressWarnings("unchecked") |
| | | private CompositeDBCursor<DN> nextChangeForInsertDBCursor = |
| | | new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false); |
| | | |
| | | /** |
| | | * New cursors for this Map must be created from the {@link #run()} method, |
| | | * i.e. from the same thread that will make use of them. If this rule is not |
| | | * obeyed, then a JE exception will be thrown about |
| | | * "Non-transactional Cursors may not be used in multiple threads;". |
| | | */ |
| | | private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors = |
| | | new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>(); |
| | | /** |
| | | * Holds the newCursors that will have to be created in the next iteration |
| | | * inside the {@link #run()} method. |
| | | * <p> |
| | | * This map can be updated by multiple threads. |
| | | */ |
| | | private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors = |
| | | new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>( |
| | | new Comparator<Pair<DN, Integer>>() |
| | | { |
| | | @Override |
| | | public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2) |
| | | { |
| | | final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst()); |
| | | if (compareBaseDN == 0) |
| | | { |
| | | return o1.getSecond().compareTo(o2.getSecond()); |
| | | } |
| | | return compareBaseDN; |
| | | } |
| | | }); |
| | | private MultiDomainDBCursor nextChangeForInsertDBCursor; |
| | | |
| | | /** |
| | | * Builds a ChangeNumberIndexer object. |
| | |
| | | return; |
| | | } |
| | | |
| | | final CSN csn = updateMsg.getCSN(); |
| | | // only keep the oldest CSN that will be the new cursor's starting point |
| | | newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn); |
| | | final CSN oldestCSNBefore = getOldestLastAliveCSN(); |
| | | lastAliveCSNs.update(baseDN, csn); |
| | | lastAliveCSNs.update(baseDN, updateMsg.getCSN()); |
| | | tryNotify(oldestCSNBefore); |
| | | } |
| | | |
| | |
| | | for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | { |
| | | final DN baseDN = entry.getKey(); |
| | | if (!isECLEnabledDomain(baseDN)) |
| | | if (isECLEnabledDomain(baseDN)) |
| | | { |
| | | continue; |
| | | } |
| | | for (Integer serverId : entry.getValue()) |
| | | { |
| | | /* |
| | | * initialize with the oldest possible CSN in order for medium |
| | | * consistency to wait for all replicas to be alive before moving |
| | | * forward |
| | | */ |
| | | lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId)); |
| | | } |
| | | |
| | | for (Integer serverId : entry.getValue()) |
| | | { |
| | | /* |
| | | * initialize with the oldest possible CSN in order for medium |
| | | * consistency to wait for all replicas to be alive before moving |
| | | * forward |
| | | */ |
| | | lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId)); |
| | | // start after the actual CSN when initializing from the previous cookie |
| | | final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId); |
| | | ensureCursorExists(baseDN, serverId, csn); |
| | | ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); |
| | | lastAliveCSNs.update(baseDN, latestKnownState); |
| | | } |
| | | |
| | | ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); |
| | | lastAliveCSNs.update(baseDN, latestKnownState); |
| | | } |
| | | resetNextChangeForInsertDBCursor(); |
| | | |
| | | nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV); |
| | | nextChangeForInsertDBCursor.next(); |
| | | |
| | | if (newestRecord != null) |
| | | { |
| | |
| | | return new CSN(0, 0, serverId); |
| | | } |
| | | |
| | | private void resetNextChangeForInsertDBCursor() throws ChangelogException |
| | | { |
| | | final Map<DBCursor<UpdateMsg>, DN> cursors = |
| | | new HashMap<DBCursor<UpdateMsg>, DN>(); |
| | | for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry |
| | | : this.allCursors.entrySet()) |
| | | { |
| | | for (Entry<Integer, DBCursor<UpdateMsg>> entry2 |
| | | : entry.getValue().entrySet()) |
| | | { |
| | | cursors.put(entry2.getValue(), entry.getKey()); |
| | | } |
| | | } |
| | | |
| | | // CNIndexer manages the cursor itself, |
| | | // so do not try to recycle exhausted cursors |
| | | CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false); |
| | | result.next(); |
| | | nextChangeForInsertDBCursor = result; |
| | | } |
| | | |
| | | private boolean ensureCursorExists(DN baseDN, Integer serverId, |
| | | CSN startAfterCSN) throws ChangelogException |
| | | { |
| | | Map<Integer, DBCursor<UpdateMsg>> map = allCursors.get(baseDN); |
| | | if (map == null) |
| | | { |
| | | map = new ConcurrentSkipListMap<Integer, DBCursor<UpdateMsg>>(); |
| | | allCursors.put(baseDN, map); |
| | | } |
| | | DBCursor<UpdateMsg> cursor = map.get(serverId); |
| | | if (cursor == null) |
| | | { |
| | | final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); |
| | | cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN); |
| | | cursor.next(); |
| | | map.put(serverId, cursor); |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Returns the immediately preceding CSN. |
| | | * |
| | | * @param csn |
| | | * the CSN to use |
| | | * @return the immediately preceding CSN or null if the provided CSN is null. |
| | | */ |
| | | CSN getPrecedingCSN(CSN csn) |
| | | { |
| | | if (csn == null) |
| | | { |
| | | return null; |
| | | } |
| | | if (csn.getSeqnum() > 0) |
| | | { |
| | | return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId()); |
| | | } |
| | | return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId()); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void initiateShutdown() |
| | |
| | | { |
| | | try |
| | | { |
| | | if (!domainsToClear.isEmpty()) |
| | | while (!domainsToClear.isEmpty()) |
| | | { |
| | | while (!domainsToClear.isEmpty()) |
| | | { |
| | | final DN baseDNToClear = domainsToClear.first(); |
| | | removeCursors(baseDNToClear); |
| | | // Only release the waiting thread |
| | | // once this domain's state has been cleared. |
| | | domainsToClear.remove(baseDNToClear); |
| | | } |
| | | resetNextChangeForInsertDBCursor(); |
| | | } |
| | | else |
| | | { |
| | | final boolean createdCursors = createNewCursors(); |
| | | final boolean recycledCursors = recycleExhaustedCursors(); |
| | | if (createdCursors || recycledCursors) |
| | | { |
| | | resetNextChangeForInsertDBCursor(); |
| | | } |
| | | final DN baseDNToClear = domainsToClear.first(); |
| | | nextChangeForInsertDBCursor.removeDomain(baseDNToClear); |
| | | // Only release the waiting thread |
| | | // once this domain's state has been cleared. |
| | | domainsToClear.remove(baseDNToClear); |
| | | } |
| | | |
| | | // Do not call DBCursor.next() here |
| | | // because we might not have consumed the last record, |
| | | // for example if we could not move the MCP forward |
| | | final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord(); |
| | | if (msg == null) |
| | | { |
| | |
| | | } |
| | | wait(); |
| | | } |
| | | // loop to check whether new changes have been added to the |
| | | // ReplicaDBs |
| | | // check whether new changes have been added to the ReplicaDBs |
| | | nextChangeForInsertDBCursor.next(); |
| | | continue; |
| | | } |
| | | else if (msg instanceof ReplicaOfflineMsg) |
| | | { |
| | | nextChangeForInsertDBCursor.next(); |
| | | continue; |
| | | } |
| | | |
| | |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | | // Nothing can be done about it. |
| | | // Rely on the DirectoryThread uncaught exceptions handler |
| | | // for logging error + alert. |
| | | // Message logged here gives corrective information to the administrator. |
| | | Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get( |
| | | getClass().getSimpleName(), stackTraceToSingleLineString(e)); |
| | | TRACER.debugError(msg.toString()); |
| | | logUnexpectedException(e); |
| | | // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert. |
| | | throw e; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // Nothing can be done about it. |
| | | // Rely on the DirectoryThread uncaught exceptions handler |
| | | // for logging error + alert. |
| | | // Message logged here gives corrective information to the administrator. |
| | | Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get( |
| | | getClass().getSimpleName(), stackTraceToSingleLineString(e)); |
| | | TRACER.debugError(msg.toString()); |
| | | logUnexpectedException(e); |
| | | // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert. |
| | | throw new RuntimeException(e); |
| | | } |
| | | finally |
| | | { |
| | | removeCursors(DN.NULL_DN); |
| | | nextChangeForInsertDBCursor.close(); |
| | | nextChangeForInsertDBCursor = null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Nothing can be done about it. |
| | | * <p> |
| | | * Rely on the DirectoryThread uncaught exceptions handler for logging error + |
| | | * alert. |
| | | * <p> |
| | | * Message logged here gives corrective information to the administrator. |
| | | */ |
| | | private void logUnexpectedException(Exception e) |
| | | { |
| | | Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get( |
| | | getClass().getSimpleName(), stackTraceToSingleLineString(e)); |
| | | TRACER.debugError(msg.toString()); |
| | | } |
| | | |
| | | private void moveForwardMediumConsistencyPoint(final CSN mcCSN, |
| | | final DN mcBaseDN) throws ChangelogException |
| | | { |
| | | // update, so it becomes the previous cookie for the next change |
| | | mediumConsistencyRUV.update(mcBaseDN, mcCSN); |
| | | |
| | | boolean callNextOnCursor = true; |
| | | final int mcServerId = mcCSN.getServerId(); |
| | | final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId); |
| | | final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId); |
| | |
| | | } |
| | | else if (offlineCSN.isOlderThan(mcCSN)) |
| | | { |
| | | Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>> |
| | | pair = getCursor(mcBaseDN, mcCSN.getServerId()); |
| | | Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond(); |
| | | if (iter != null && !iter.hasNext()) |
| | | { |
| | | /* |
| | | * replica is not back online, Medium consistency point has gone past |
| | | * its last offline time, and there are no more changes after the |
| | | * offline CSN in the cursor: remove everything known about it: |
| | | * cursor, offlineCSN from lastAliveCSN and remove all knowledge of |
| | | * this replica from the medium consistency RUV. |
| | | */ |
| | | iter.remove(); |
| | | StaticUtils.close(pair.getFirst()); |
| | | resetNextChangeForInsertDBCursor(); |
| | | callNextOnCursor = false; |
| | | lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); |
| | | } |
| | | /* |
| | | * replica is not back online, Medium consistency point has gone past |
| | | * its last offline time, and there are no more changes after the |
| | | * offline CSN in the cursor: remove everything known about it: |
| | | * cursor, offlineCSN from lastAliveCSN and remove all knowledge of |
| | | * this replica from the medium consistency RUV. |
| | | */ |
| | | // TODO JNR how to close cursor for offline replica? |
| | | lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); |
| | | } |
| | | } |
| | | |
| | | if (callNextOnCursor) |
| | | { |
| | | // advance the cursor we just read from, |
| | | // success/failure will be checked later |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | } |
| | | |
| | | private void removeCursors(DN baseDN) |
| | | { |
| | | if (nextChangeForInsertDBCursor != null) |
| | | { |
| | | nextChangeForInsertDBCursor.close(); |
| | | nextChangeForInsertDBCursor = null; |
| | | } |
| | | if (DN.NULL_DN.equals(baseDN)) |
| | | { |
| | | // close all cursors |
| | | for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values()) |
| | | { |
| | | StaticUtils.close(map.values()); |
| | | } |
| | | allCursors.clear(); |
| | | newCursors.clear(); |
| | | } |
| | | else |
| | | { |
| | | // close cursors for this DN |
| | | final Map<Integer, DBCursor<UpdateMsg>> map = allCursors.remove(baseDN); |
| | | if (map != null) |
| | | { |
| | | StaticUtils.close(map.values()); |
| | | } |
| | | for (Iterator<Pair<DN, Integer>> it = newCursors.keySet().iterator(); it.hasNext();) |
| | | { |
| | | if (it.next().getFirst().equals(baseDN)) |
| | | { |
| | | it.remove(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>> |
| | | getCursor(final DN baseDN, final int serverId) throws ChangelogException |
| | | { |
| | | for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1 |
| | | : allCursors.entrySet()) |
| | | { |
| | | if (baseDN.equals(entry1.getKey())) |
| | | { |
| | | for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = |
| | | entry1.getValue().entrySet().iterator(); iter.hasNext();) |
| | | { |
| | | final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next(); |
| | | if (serverId == entry2.getKey()) |
| | | { |
| | | return Pair.of(entry2.getValue(), iter); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | return Pair.empty(); |
| | | } |
| | | |
| | | private boolean recycleExhaustedCursors() throws ChangelogException |
| | | { |
| | | boolean succesfullyRecycled = false; |
| | | for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values()) |
| | | { |
| | | for (DBCursor<UpdateMsg> cursor : map.values()) |
| | | { |
| | | // try to recycle it by calling next() |
| | | if (cursor.getRecord() == null && cursor.next()) |
| | | { |
| | | succesfullyRecycled = true; |
| | | } |
| | | } |
| | | } |
| | | return succesfullyRecycled; |
| | | } |
| | | |
| | | private boolean createNewCursors() throws ChangelogException |
| | | { |
| | | if (!newCursors.isEmpty()) |
| | | { |
| | | boolean newCursorAdded = false; |
| | | for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter = |
| | | newCursors.entrySet().iterator(); iter.hasNext();) |
| | | { |
| | | final Entry<Pair<DN, Integer>, CSN> entry = iter.next(); |
| | | final DN baseDN = entry.getKey().getFirst(); |
| | | final CSN csn = entry.getValue(); |
| | | // start after preceding CSN so the first CSN read will exactly be the |
| | | // current one |
| | | final CSN startFromCSN = getPrecedingCSN(csn); |
| | | if (!ensureCursorExists(baseDN, csn.getServerId(), startFromCSN)) |
| | | { |
| | | newCursorAdded = true; |
| | | } |
| | | iter.remove(); |
| | | } |
| | | return newCursorAdded; |
| | | } |
| | | return false; |
| | | // advance the cursor we just read from, |
| | | // success/failure will be checked later |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | |
| | | /** |