| | |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import java.io.File; |
| | | import java.util.ArrayList; |
| | | import java.util.Collections; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicaId; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.api.ReplicaId; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer; |
| | | import org.opends.server.replication.server.changelog.je.DomainDBCursor; |
| | |
| | | */ |
| | | private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs = |
| | | new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>(); |
| | | /** |
| | | * \@GuardedBy("itself") |
| | | */ |
| | | private final Map<DN, List<DomainDBCursor>> registeredDomainCursors = |
| | | new HashMap<DN, List<DomainDBCursor>>(); |
| | | private final ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>> registeredDomainCursors = |
| | | new ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>>(); |
| | | private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = |
| | | new CopyOnWriteArrayList<MultiDomainDBCursor>(); |
| | | private final ConcurrentSkipListMap<ReplicaId, List<ReplicaCursor>> replicaCursors = |
| | | new ConcurrentSkipListMap<ReplicaId, List<ReplicaCursor>>(); |
| | | private final ConcurrentSkipListMap<ReplicaId, CopyOnWriteArrayList<ReplicaCursor>> replicaCursors = |
| | | new ConcurrentSkipListMap<ReplicaId, CopyOnWriteArrayList<ReplicaCursor>>(); |
| | | private ReplicationEnvironment replicationEnv; |
| | | private final ReplicationServerCfg config; |
| | | private final File dbDirectory; |
| | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(e.getLocalizedMessage()).append(" ") |
| | | .append(String.valueOf(dbDirectory)); |
| | | final LocalizableMessageBuilder mb = new LocalizableMessageBuilder( |
| | | e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory)); |
| | | throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e); |
| | | } |
| | | } |
| | |
| | | // on the domainMap to create a new ReplicaDB |
| | | synchronized (domainMap) |
| | | { |
| | | // double-check |
| | | currentValue = domainMap.get(serverId); |
| | | if (currentValue != null) |
| | | { |
| | |
| | | catch (ChangelogException e) |
| | | { |
| | | logger.traceException(e); |
| | | logger.error(ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage())); |
| | | logger.error(ERR_COULD_NOT_READ_DB, this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage()); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | firstException = e; |
| | | } |
| | | |
| | | for (Iterator<ConcurrentMap<Integer, FileReplicaDB>> it = |
| | | this.domainToReplicaDBs.values().iterator(); it.hasNext();) |
| | | { |
| | |
| | | { |
| | | firstException = e; |
| | | } |
| | | else if (logger.isTraceEnabled()) |
| | | else |
| | | { |
| | | logger.traceException(e); |
| | | } |
| | |
| | | { |
| | | firstException = e; |
| | | } |
| | | else if (logger.isTraceEnabled()) |
| | | else |
| | | { |
| | | logger.traceException(e); |
| | | } |
| | |
| | | catch (Exception e) |
| | | { |
| | | logger.traceException(e); |
| | | logger.error(ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage())); |
| | | logger.error(ERR_CHANGENUMBER_DATABASE, e.getLocalizedMessage()); |
| | | } |
| | | } |
| | | return cnIndexDB; |
| | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, |
| | | KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final Set<DN> excludedDomainDns = Collections.emptySet(); |
| | | return getCursorFrom(startState, matchingStrategy, positionStrategy, excludedDomainDns); |
| | |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy, |
| | | final Set<DN> excludedDomainDns) |
| | | throws ChangelogException |
| | | final Set<DN> excludedDomainDns) throws ChangelogException |
| | | { |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy); |
| | | registeredMultiDomainCursors.add(cursor); |
| | |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | { |
| | | synchronized (registeredDomainCursors) |
| | | { |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy); |
| | | List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors == null) |
| | | { |
| | | cursors = new ArrayList<DomainDBCursor>(); |
| | | registeredDomainCursors.put(baseDN, cursors); |
| | | } |
| | | cursors.add(cursor); |
| | | putCursor(registeredDomainCursors, baseDN, cursor); |
| | | return cursor; |
| | | } |
| | | } |
| | | |
| | | private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN) |
| | | { |
| | |
| | | final ReplicaId replicaId = ReplicaId.of(baseDN, serverId); |
| | | final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this); |
| | | |
| | | synchronized (replicaCursors) |
| | | { |
| | | List<ReplicaCursor> cursors = replicaCursors.get(replicaId); |
| | | if (cursors == null) |
| | | { |
| | | cursors = new ArrayList<ReplicaCursor>(); |
| | | replicaCursors.put(replicaId, cursors); |
| | | } |
| | | cursors.add(replicaCursor); |
| | | } |
| | | putCursor(replicaCursors, replicaId, replicaCursor); |
| | | |
| | | return replicaCursor; |
| | | } |
| | | return EMPTY_CURSOR_REPLICA_DB; |
| | | } |
| | | |
| | | private <K, V> void putCursor(ConcurrentSkipListMap<K, CopyOnWriteArrayList<V>> map, final K key, final V cursor) |
| | | { |
| | | CopyOnWriteArrayList<V> cursors = map.get(key); |
| | | if (cursors == null) |
| | | { |
| | | cursors = new CopyOnWriteArrayList<V>(); |
| | | CopyOnWriteArrayList<V> previousValue = map.putIfAbsent(key, cursors); |
| | | if (previousValue != null) |
| | | { |
| | | cursors = previousValue; |
| | | } |
| | | } |
| | | cursors.add(cursor); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void unregisterCursor(final DBCursor<?> cursor) |
| | |
| | | else if (cursor instanceof DomainDBCursor) |
| | | { |
| | | final DomainDBCursor domainCursor = (DomainDBCursor) cursor; |
| | | synchronized (registeredMultiDomainCursors) |
| | | { |
| | | final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN()); |
| | | if (cursors != null) |
| | | { |
| | | cursors.remove(cursor); |
| | | } |
| | | } |
| | | } |
| | | else if (cursor instanceof ReplicaCursor) |
| | | { |
| | | final ReplicaCursor replicaCursor = (ReplicaCursor) cursor; |
| | | synchronized (replicaCursors) |
| | | { |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaId()); |
| | | if (cursors != null) |
| | | { |
| | |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | |
| | | |
| | | private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN) |
| | | { |
| | | synchronized (replicaCursors) |
| | | { |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(ReplicaId.of(baseDN, serverId)); |
| | | if (cursors != null) |
| | | { |
| | |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * The thread purging the changelogDB on a regular interval. Records are |
| | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get(stackTraceToSingleLineString(e))); |
| | | logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH, stackTraceToSingleLineString(e)); |
| | | if (replicationServer != null) |
| | | { |
| | | replicationServer.shutdown(); |