| | |
| | | /** |
| | | * This map contains the List of updates received from each LDAP server. |
| | | */ |
| | | private final Map<DN, Map<Integer, DbHandler>> sourceDbHandlers = |
| | | new ConcurrentHashMap<DN, Map<Integer, DbHandler>>(); |
| | | private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs = |
| | | new ConcurrentHashMap<DN, Map<Integer, JEReplicaDB>>(); |
| | | private ReplicationDbEnv dbEnv; |
| | | private final String dbDirectoryName; |
| | | private final File dbDirectory; |
| | |
| | | } |
| | | } |
| | | |
| | | private Map<Integer, DbHandler> getDomainMap(DN baseDN) |
| | | private Map<Integer, JEReplicaDB> getDomainMap(DN baseDN) |
| | | { |
| | | final Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDN); |
| | | final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); |
| | | if (domainMap != null) |
| | | { |
| | | return domainMap; |
| | |
| | | return Collections.emptyMap(); |
| | | } |
| | | |
| | | private DbHandler getDbHandler(DN baseDN, int serverId) |
| | | private JEReplicaDB getReplicaDB(DN baseDN, int serverId) |
| | | { |
| | | return getDomainMap(baseDN).get(serverId); |
| | | } |
| | |
| | | private void commission(DN baseDN, int serverId, ReplicationServer rs) |
| | | throws ChangelogException |
| | | { |
| | | getOrCreateDbHandler(baseDN, serverId, rs); |
| | | getOrCreateReplicaDB(baseDN, serverId, rs); |
| | | } |
| | | |
| | | /** |
| | | * Returns a DbHandler, possibly creating it. |
| | | * Returns a {@link JEReplicaDB}, possibly creating it. |
| | | * |
| | | * @param baseDN |
| | | * the baseDN for which to create a DbHandler |
| | | * the baseDN for which to create a ReplicaDB |
| | | * @param serverId |
| | | * the baseserverId for which to create a DbHandler |
| | | * the serverId for which to create a ReplicaDB |
| | | * @param rs |
| | | * the ReplicationServer |
| | | * @return a Pair with the DbHandler and a a boolean indicating if it has been |
| | | * created |
| | | * @return a Pair with the JEReplicaDB and a boolean indicating whether it had |
| | | * to be created |
| | | * @throws ChangelogException |
| | | * if a problem occurred with the database |
| | | */ |
| | | Pair<DbHandler, Boolean> getOrCreateDbHandler(DN baseDN, |
| | | Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN, |
| | | int serverId, ReplicationServer rs) throws ChangelogException |
| | | { |
| | | synchronized (sourceDbHandlers) |
| | | synchronized (domainToReplicaDBs) |
| | | { |
| | | Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDN); |
| | | Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); |
| | | if (domainMap == null) |
| | | { |
| | | domainMap = new ConcurrentHashMap<Integer, DbHandler>(); |
| | | sourceDbHandlers.put(baseDN, domainMap); |
| | | domainMap = new ConcurrentHashMap<Integer, JEReplicaDB>(); |
| | | domainToReplicaDBs.put(baseDN, domainMap); |
| | | } |
| | | |
| | | DbHandler dbHandler = domainMap.get(serverId); |
| | | if (dbHandler == null) |
| | | JEReplicaDB replicaDB = domainMap.get(serverId); |
| | | if (replicaDB == null) |
| | | { |
| | | dbHandler = |
| | | new DbHandler(serverId, baseDN, rs, dbEnv, rs.getQueueSize()); |
| | | domainMap.put(serverId, dbHandler); |
| | | return Pair.of(dbHandler, true); |
| | | replicaDB = |
| | | new JEReplicaDB(serverId, baseDN, rs, dbEnv, rs.getQueueSize()); |
| | | domainMap.put(serverId, replicaDB); |
| | | return Pair.of(replicaDB, true); |
| | | } |
| | | return Pair.of(dbHandler, false); |
| | | return Pair.of(replicaDB, false); |
| | | } |
| | | } |
| | | |
| | |
| | | // - then throw the first encountered exception |
| | | ChangelogException firstException = null; |
| | | |
| | | for (DN baseDN : this.sourceDbHandlers.keySet()) |
| | | for (DN baseDN : this.domainToReplicaDBs.keySet()) |
| | | { |
| | | removeDomain(baseDN); |
| | | } |
| | |
| | | @Override |
| | | public long getCount(DN baseDN, CSN from, CSN to) |
| | | { |
| | | DbHandler dbHandler = getDbHandler(baseDN, from.getServerId()); |
| | | if (dbHandler != null) |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, from.getServerId()); |
| | | if (replicaDB != null) |
| | | { |
| | | return dbHandler.getCount(from, to); |
| | | return replicaDB.getCount(from, to); |
| | | } |
| | | return 0; |
| | | } |
| | |
| | | public long getDomainChangesCount(DN baseDN) |
| | | { |
| | | long entryCount = 0; |
| | | for (DbHandler dbHandler : getDomainMap(baseDN).values()) |
| | | for (JEReplicaDB replicaDB : getDomainMap(baseDN).values()) |
| | | { |
| | | entryCount += dbHandler.getChangesCount(); |
| | | entryCount += replicaDB.getChangesCount(); |
| | | } |
| | | return entryCount; |
| | | } |
| | |
| | | @Override |
| | | public void shutdownDomain(DN baseDN) |
| | | { |
| | | shutdownDbHandlers(getDomainMap(baseDN)); |
| | | sourceDbHandlers.remove(baseDN); |
| | | shutdownReplicaDBs(getDomainMap(baseDN)); |
| | | domainToReplicaDBs.remove(baseDN); |
| | | } |
| | | |
| | | private void shutdownDbHandlers(Map<Integer, DbHandler> domainMap) |
| | | private void shutdownReplicaDBs(Map<Integer, JEReplicaDB> domainMap) |
| | | { |
| | | synchronized (domainMap) |
| | | { |
| | | for (DbHandler dbHandler : domainMap.values()) |
| | | for (JEReplicaDB replicaDB : domainMap.values()) |
| | | { |
| | | dbHandler.shutdown(); |
| | | replicaDB.shutdown(); |
| | | } |
| | | domainMap.clear(); |
| | | } |
| | |
| | | public ServerState getDomainOldestCSNs(DN baseDN) |
| | | { |
| | | final ServerState result = new ServerState(); |
| | | for (DbHandler dbHandler : getDomainMap(baseDN).values()) |
| | | for (JEReplicaDB replicaDB : getDomainMap(baseDN).values()) |
| | | { |
| | | result.update(dbHandler.getOldestCSN()); |
| | | result.update(replicaDB.getOldestCSN()); |
| | | } |
| | | return result; |
| | | } |
| | |
| | | public ServerState getDomainNewestCSNs(DN baseDN) |
| | | { |
| | | final ServerState result = new ServerState(); |
| | | for (DbHandler dbHandler : getDomainMap(baseDN).values()) |
| | | for (JEReplicaDB replicaDB : getDomainMap(baseDN).values()) |
| | | { |
| | | result.update(dbHandler.getNewestCSN()); |
| | | result.update(replicaDB.getNewestCSN()); |
| | | } |
| | | return result; |
| | | } |
| | |
| | | ChangelogException firstException = null; |
| | | |
| | | // 1- clear the replica DBs |
| | | final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN); |
| | | final Map<Integer, JEReplicaDB> domainMap = getDomainMap(baseDN); |
| | | synchronized (domainMap) |
| | | { |
| | | for (DbHandler dbHandler : domainMap.values()) |
| | | for (JEReplicaDB replicaDB : domainMap.values()) |
| | | { |
| | | try |
| | | { |
| | | dbHandler.clear(); |
| | | replicaDB.clear(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | firstException = e; |
| | | } |
| | | } |
| | | shutdownDbHandlers(domainMap); |
| | | sourceDbHandlers.remove(baseDN); |
| | | shutdownReplicaDBs(domainMap); |
| | | domainToReplicaDBs.remove(baseDN); |
| | | } |
| | | |
| | | // 2- clear the ChangeNumber index DB |
| | |
| | | @Override |
| | | public void setPurgeDelay(long delay) |
| | | { |
| | | for (Map<Integer, DbHandler> domainMap : sourceDbHandlers.values()) |
| | | for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values()) |
| | | { |
| | | for (DbHandler dbHandler : domainMap.values()) |
| | | for (JEReplicaDB replicaDB : domainMap.values()) |
| | | { |
| | | dbHandler.setPurgeDelay(delay); |
| | | replicaDB.setPurgeDelay(delay); |
| | | } |
| | | } |
| | | } |
| | |
| | | public long getDomainLatestTrimDate(DN baseDN) |
| | | { |
| | | long latest = 0; |
| | | for (DbHandler dbHandler : getDomainMap(baseDN).values()) |
| | | for (JEReplicaDB replicaDB : getDomainMap(baseDN).values()) |
| | | { |
| | | if (latest == 0 || latest < dbHandler.getLatestTrimDate()) |
| | | if (latest == 0 || latest < replicaDB.getLatestTrimDate()) |
| | | { |
| | | latest = dbHandler.getLatestTrimDate(); |
| | | latest = replicaDB.getLatestTrimDate(); |
| | | } |
| | | } |
| | | return latest; |
| | |
| | | @Override |
| | | public CSN getCSNAfter(DN baseDN, int serverId, CSN startAfterCSN) |
| | | { |
| | | final DbHandler dbHandler = getDbHandler(baseDN, serverId); |
| | | final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | |
| | | ReplicaDBCursor cursor = null; |
| | | try |
| | | { |
| | | cursor = dbHandler.generateCursorFrom(startAfterCSN); |
| | | cursor = replicaDB.generateCursorFrom(startAfterCSN); |
| | | if (cursor != null && cursor.getChange() != null) |
| | | { |
| | | return cursor.getChange().getCSN(); |
| | |
| | | { |
| | | try |
| | | { |
| | | cnIndexDB = new DraftCNDbHandler(replicationServer, this.dbEnv); |
| | | cnIndexDB = new JEChangeNumberIndexDB(replicationServer, this.dbEnv); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | public ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, |
| | | CSN startAfterCSN) |
| | | { |
| | | DbHandler dbHandler = getDbHandler(baseDN, serverId); |
| | | if (dbHandler != null) |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | try |
| | | { |
| | | ReplicaDBCursor cursor = dbHandler.generateCursorFrom(startAfterCSN); |
| | | ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN); |
| | | cursor.next(); |
| | | return cursor; |
| | | } |
| | |
| | | public boolean publishUpdateMsg(DN baseDN, int serverId, |
| | | UpdateMsg updateMsg) throws ChangelogException |
| | | { |
| | | final Pair<DbHandler, Boolean> pair = |
| | | getOrCreateDbHandler(baseDN, serverId, replicationServer); |
| | | final DbHandler dbHandler = pair.getFirst(); |
| | | final Pair<JEReplicaDB, Boolean> pair = |
| | | getOrCreateReplicaDB(baseDN, serverId, replicationServer); |
| | | final JEReplicaDB replicaDB = pair.getFirst(); |
| | | final boolean wasCreated = pair.getSecond(); |
| | | |
| | | dbHandler.add(updateMsg); |
| | | replicaDB.add(updateMsg); |
| | | return wasCreated; |
| | | } |
| | | |