opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -26,9 +26,7 @@ package org.opends.server.replication.server.changelog.file; import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -100,15 +98,12 @@ */ private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>(); /** * \@GuardedBy("itself") */ private final Map<DN, List<DomainDBCursor>> registeredDomainCursors = new HashMap<DN, List<DomainDBCursor>>(); private final ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>> registeredDomainCursors = new ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>>(); private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = new CopyOnWriteArrayList<MultiDomainDBCursor>(); private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors = new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR); private final ConcurrentSkipListMap<Pair<DN, Integer>, CopyOnWriteArrayList<ReplicaCursor>> replicaCursors = new ConcurrentSkipListMap<Pair<DN, Integer>, CopyOnWriteArrayList<ReplicaCursor>>(Pair.COMPARATOR); private ReplicationEnvironment replicationEnv; private final ReplicationServerCfg config; private final File dbDirectory; @@ -274,6 +269,7 @@ // on the domainMap to create a new ReplicaDB synchronized (domainMap) { // double-check currentValue = domainMap.get(serverId); if (currentValue != null) { @@ -399,6 +395,7 @@ { firstException = e; } for (Iterator<ConcurrentMap<Integer, FileReplicaDB>> it = this.domainToReplicaDBs.values().iterator(); it.hasNext();) { @@ -644,7 +641,9 @@ catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } logError(ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage())); } } @@ -662,7 +661,7 @@ /** {@inheritDoc} */ @Override public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException { final Set<DN> excludedDomainDns = Collections.emptySet(); return getCursorFrom(startState, matchingStrategy, positionStrategy, excludedDomainDns); @@ -672,8 +671,7 @@ @Override public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy, final Set<DN> excludedDomainDns) throws ChangelogException final Set<DN> excludedDomainDns) throws ChangelogException { final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy); registeredMultiDomainCursors.add(cursor); @@ -705,18 +703,9 @@ private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) { synchronized (registeredDomainCursors) { final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy); List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); if (cursors == null) { cursors = new ArrayList<DomainDBCursor>(); registeredDomainCursors.put(baseDN, cursors); } cursors.add(cursor); return cursor; } final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy); putCursor(registeredDomainCursors, baseDN, cursor); return cursor; } private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN) @@ -742,25 +731,31 @@ { final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy); final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId); final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this); final Pair<DN, Integer> replicaId = Pair.of(baseDN, serverId); final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this); synchronized (replicaCursors) { List<ReplicaCursor> cursors = replicaCursors.get(replicaID); if (cursors == null) { cursors = new ArrayList<ReplicaCursor>(); replicaCursors.put(replicaID, cursors); } cursors.add(replicaCursor); } putCursor(replicaCursors, replicaId, replicaCursor); return replicaCursor; } return EMPTY_CURSOR_REPLICA_DB; } private <K, V> void putCursor(ConcurrentSkipListMap<K, CopyOnWriteArrayList<V>> map, final K key, final V cursor) { CopyOnWriteArrayList<V> cursors = map.get(key); if (cursors == null) { cursors = new CopyOnWriteArrayList<V>(); CopyOnWriteArrayList<V> previousValue = map.putIfAbsent(key, cursors); if (previousValue != null) { cursors = previousValue; } } cursors.add(cursor); } /** {@inheritDoc} */ @Override public void unregisterCursor(final DBCursor<?> cursor) @@ -772,25 +767,19 @@ else if (cursor instanceof DomainDBCursor) { final DomainDBCursor domainCursor = (DomainDBCursor) cursor; synchronized (registeredMultiDomainCursors) final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN()); if (cursors != null) { final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN()); if (cursors != null) { cursors.remove(cursor); } cursors.remove(cursor); } } else if (cursor instanceof ReplicaCursor) { final ReplicaCursor replicaCursor = (ReplicaCursor) cursor; synchronized (replicaCursors) final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID()); if (cursors != null) { final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID()); if (cursors != null) { cursors.remove(cursor); } cursors.remove(cursor); } } } @@ -853,15 +842,12 @@ private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN) { synchronized (replicaCursors) final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId)); if (cursors != null) { final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId)); if (cursors != null) for (ReplicaCursor cursor : cursors) { for (ReplicaCursor cursor : cursors) { cursor.setOfflineCSN(offlineCSN); } cursor.setOfflineCSN(offlineCSN); } } } opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -26,9 +26,7 @@ package org.opends.server.replication.server.changelog.je; import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -95,15 +93,12 @@ */ private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>(); /** * \@GuardedBy("itself") */ private final Map<DN, List<DomainDBCursor>> registeredDomainCursors = new HashMap<DN, List<DomainDBCursor>>(); private final ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>> registeredDomainCursors = new ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>>(); private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = new CopyOnWriteArrayList<MultiDomainDBCursor>(); private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors = new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR); private final ConcurrentSkipListMap<Pair<DN, Integer>, CopyOnWriteArrayList<ReplicaCursor>> replicaCursors = new ConcurrentSkipListMap<Pair<DN, Integer>, CopyOnWriteArrayList<ReplicaCursor>>(Pair.COMPARATOR); private ReplicationDbEnv replicationEnv; private final ReplicationServerCfg config; private final File dbDirectory; @@ -192,11 +187,6 @@ } catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } final MessageBuilder mb = new MessageBuilder(e.getLocalizedMessage()).append(" ") .append(String.valueOf(dbDirectory)); throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e); @@ -724,13 +714,14 @@ @Override public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy, final Set<DN> excludedDomainDns) throws ChangelogException final Set<DN> excludedDomainDns) throws ChangelogException { final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy); registeredMultiDomainCursors.add(cursor); for (DN baseDN : domainToReplicaDBs.keySet()) { if (!excludedDomainDns.contains(baseDN)) { if (!excludedDomainDns.contains(baseDN)) { cursor.addDomain(baseDN, startState.getServerState(baseDN)); } } @@ -755,18 +746,9 @@ private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) { synchronized (registeredDomainCursors) { final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy); List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); if (cursors == null) { cursors = new ArrayList<DomainDBCursor>(); registeredDomainCursors.put(baseDN, cursors); } cursors.add(cursor); return cursor; } final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy); putCursor(registeredDomainCursors, baseDN, cursor); return cursor; } private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN) @@ -792,25 +774,31 @@ { final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy); final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId); final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this); final Pair<DN, Integer> replicaId = Pair.of(baseDN, serverId); final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this); synchronized (replicaCursors) { List<ReplicaCursor> cursors = replicaCursors.get(replicaID); if (cursors == null) { cursors = new ArrayList<ReplicaCursor>(); replicaCursors.put(replicaID, cursors); } cursors.add(replicaCursor); } putCursor(replicaCursors, replicaId, replicaCursor); return replicaCursor; } return EMPTY_CURSOR_REPLICA_DB; } private <K, V> void putCursor(ConcurrentSkipListMap<K, CopyOnWriteArrayList<V>> map, final K key, final V cursor) { CopyOnWriteArrayList<V> cursors = map.get(key); if (cursors == null) { cursors = new CopyOnWriteArrayList<V>(); CopyOnWriteArrayList<V> previousValue = map.putIfAbsent(key, cursors); if (previousValue != null) { cursors = previousValue; } } cursors.add(cursor); } /** {@inheritDoc} */ @Override public void unregisterCursor(final DBCursor<?> cursor) @@ -822,19 +810,16 @@ else if (cursor instanceof DomainDBCursor) { final DomainDBCursor domainCursor = (DomainDBCursor) cursor; synchronized (registeredMultiDomainCursors) final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN()); if (cursors != null) { final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN()); if (cursors != null) { cursors.remove(cursor); } cursors.remove(cursor); } } else if (cursor instanceof ReplicaCursor) { final ReplicaCursor replicaCursor = (ReplicaCursor) cursor; final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID()); final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID()); if (cursors != null) { cursors.remove(cursor); @@ -898,10 +883,10 @@ updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN); } private void updateCursorsWithOfflineCSN(final DN baseDN, int serverId, final CSN offlineCSN) private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN) { final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId)); if (cursors != null && !cursors.isEmpty()) if (cursors != null) { for (ReplicaCursor cursor : cursors) {