| | |
| | | import java.io.File; |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | |
| | | |
| | | /** |
| | | * This map contains the List of updates received from each LDAP server. |
| | | * <p> |
| | | * When removing a domainMap, code: |
| | | * <ol> |
| | | * <li>first get the domainMap</li> |
| | | * <li>synchronized on the domainMap</li> |
| | | * <li>remove the domainMap</li> |
| | | * <li>then check it's not null</li> |
| | | * <li>then close all inside</li> |
| | | * </ol> |
| | | * When creating a JEReplicaDB, synchronize on the domainMap to avoid |
| | | * concurrent shutdown. |
| | | */ |
| | | private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs = |
| | | new ConcurrentHashMap<DN, Map<Integer, JEReplicaDB>>(); |
| | | private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> |
| | | domainToReplicaDBs = |
| | | new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>(); |
| | | private ReplicationDbEnv dbEnv; |
| | | private final String dbDirectoryName; |
| | | private final File dbDirectory; |
| | |
| | | |
| | | /** The local replication server. */ |
| | | private final ReplicationServer replicationServer; |
| | | private AtomicBoolean shutdown = new AtomicBoolean(); |
| | | |
| | | private static final DBCursor<UpdateMsg> EMPTY_CURSOR = |
| | | new DBCursor<UpdateMsg>() |
| | |
| | | Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN, |
| | | int serverId, ReplicationServer rs) throws ChangelogException |
| | | { |
| | | synchronized (domainToReplicaDBs) |
| | | while (!shutdown.get()) |
| | | { |
| | | Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); |
| | | if (domainMap == null) |
| | | final ConcurrentMap<Integer, JEReplicaDB> domainMap = |
| | | getExistingOrNewDomainMap(baseDN); |
| | | final Pair<JEReplicaDB, Boolean> result = |
| | | getExistingOrNewReplicaDB(domainMap, serverId, baseDN, rs); |
| | | if (result != null) |
| | | { |
| | | domainMap = new ConcurrentHashMap<Integer, JEReplicaDB>(); |
| | | domainToReplicaDBs.put(baseDN, domainMap); |
| | | return result; |
| | | } |
| | | } |
| | | throw new ChangelogException( |
| | | ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get()); |
| | | } |
| | | |
| | | JEReplicaDB replicaDB = domainMap.get(serverId); |
| | | if (replicaDB == null) |
| | | private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap( |
| | | DN baseDN) |
| | | { |
| | | replicaDB = |
| | | new JEReplicaDB(serverId, baseDN, rs, dbEnv, rs.getQueueSize()); |
| | | domainMap.put(serverId, replicaDB); |
| | | return Pair.of(replicaDB, true); |
| | | } |
| | | return Pair.of(replicaDB, false); |
| | | } |
| | | // happy path: the domainMap already exists |
| | | final ConcurrentMap<Integer, JEReplicaDB> currentValue = |
| | | domainToReplicaDBs.get(baseDN); |
| | | if (currentValue != null) |
| | | { |
| | | return currentValue; |
| | | } |
| | | |
| | | // unlucky, the domainMap does not exist: take the hit and create the |
| | | // newValue, even though the same could be done concurrently by another |
| | | // thread |
| | | final ConcurrentMap<Integer, JEReplicaDB> newValue = |
| | | new ConcurrentHashMap<Integer, JEReplicaDB>(); |
| | | final ConcurrentMap<Integer, JEReplicaDB> previousValue = |
| | | domainToReplicaDBs.putIfAbsent(baseDN, newValue); |
| | | if (previousValue != null) |
| | | { |
| | | // there was already a value associated to the key, let's use it |
| | | return previousValue; |
| | | } |
| | | return newValue; |
| | | } |
| | | |
| | | private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB( |
| | | final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId, |
| | | DN baseDN, ReplicationServer rs) throws ChangelogException |
| | | { |
| | | // happy path: the JEReplicaDB already exists |
| | | JEReplicaDB currentValue = domainMap.get(serverId); |
| | | if (currentValue != null) |
| | | { |
| | | return Pair.of(currentValue, false); |
| | | } |
| | | |
| | | // unlucky, the JEReplicaDB does not exist: take the hit and synchronize |
| | | // on the domainMap to create a new ReplicaDB |
| | | synchronized (domainMap) |
| | | { |
| | | // double-check |
| | | currentValue = domainMap.get(serverId); |
| | | if (currentValue != null) |
| | | { |
| | | return Pair.of(currentValue, false); |
| | | } |
| | | |
| | | if (domainToReplicaDBs.get(baseDN) != domainMap) |
| | | { |
| | | // the domainMap could have been concurrently removed because |
| | | // 1) a shutdown was initiated or 2) an initialize was called. |
| | | // Return will allow the code to: |
| | | // 1) shutdown properly or 2) lazily recreate the JEReplicaDB |
| | | return null; |
| | | } |
| | | |
| | | final JEReplicaDB newValue = new JEReplicaDB(serverId, baseDN, rs, dbEnv); |
| | | domainMap.put(serverId, newValue); |
| | | return Pair.of(newValue, true); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | |
| | | @Override |
| | | public void shutdownDB() throws ChangelogException |
| | | { |
| | | if (!this.shutdown.compareAndSet(false, true)) |
| | | { // shutdown has already been initiated |
| | | return; |
| | | } |
| | | |
| | | // Remember the first exception because : |
| | | // - we want to try to remove everything we want to remove |
| | | // - then throw the first encountered exception |
| | |
| | | firstException = e; |
| | | } |
| | | |
| | | for (Iterator<ConcurrentMap<Integer, JEReplicaDB>> it = |
| | | this.domainToReplicaDBs.values().iterator(); it.hasNext();) |
| | | { |
| | | final ConcurrentMap<Integer, JEReplicaDB> domainMap = it.next(); |
| | | synchronized (domainMap) |
| | | { |
| | | it.remove(); |
| | | innerShutdownDomain(domainMap); |
| | | } |
| | | } |
| | | |
| | | if (dbEnv != null) |
| | | { |
| | | dbEnv.shutdown(); |
| | |
| | | @Override |
| | | public void shutdownDomain(DN baseDN) |
| | | { |
| | | shutdownReplicaDBs(baseDN, getDomainMap(baseDN)); |
| | | if (this.shutdown.get()) |
| | | { // shutdown has already been initiated |
| | | return; |
| | | } |
| | | |
| | | private void shutdownReplicaDBs(DN baseDN, |
| | | Map<Integer, JEReplicaDB> domainMap) |
| | | final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); |
| | | if (domainMap != null) |
| | | { |
| | | synchronized (domainMap) |
| | | { |
| | | innerShutdownDomain(domainToReplicaDBs.remove(baseDN)); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This method assumes the domainMap is synchronized by calling code and that |
| | | * the domainMap is not null. |
| | | */ |
| | | private void innerShutdownDomain(final Map<Integer, JEReplicaDB> domainMap) |
| | | { |
| | | for (JEReplicaDB replicaDB : domainMap.values()) |
| | | { |
| | | replicaDB.shutdown(); |
| | | } |
| | | domainToReplicaDBs.remove(baseDN); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | ChangelogException firstException = null; |
| | | |
| | | // 1- clear the replica DBs |
| | | final Map<Integer, JEReplicaDB> domainMap = getDomainMap(baseDN); |
| | | Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); |
| | | if (domainMap != null) |
| | | { |
| | | synchronized (domainMap) |
| | | { |
| | | domainMap = domainToReplicaDBs.remove(baseDN); |
| | | for (JEReplicaDB replicaDB : domainMap.values()) |
| | | { |
| | | try |
| | |
| | | { |
| | | firstException = e; |
| | | } |
| | | replicaDB.shutdown(); |
| | | } |
| | | shutdownReplicaDBs(baseDN, domainMap); |
| | | } |
| | | } |
| | | |
| | | // 2- clear the ChangeNumber index DB |