| | |
| | | * concurrent shutdown. |
| | | */ |
| | | private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> |
| | | domainToReplicaDBs = |
| | | new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>(); |
| | | domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>(); |
| | | private ReplicationDbEnv dbEnv; |
| | | private ReplicationServerCfg config; |
| | | private final File dbDirectory; |
| | |
| | | * @GuardedBy("cnIndexDBLock") |
| | | */ |
| | | private JEChangeNumberIndexDB cnIndexDB; |
| | | private final AtomicReference<ChangeNumberIndexer> cnIndexer = |
| | | new AtomicReference<ChangeNumberIndexer>(); |
| | | private final AtomicReference<ChangeNumberIndexer> cnIndexer = new AtomicReference<ChangeNumberIndexer>(); |
| | | |
| | | /** Used for protecting {@link ChangeNumberIndexDB} related state. */ |
| | | private final Object cnIndexDBLock = new Object(); |
| | |
| | | * older than this delay might be removed. |
| | | */ |
| | | private long purgeDelayInMillis; |
| | | private final AtomicReference<ChangelogDBPurger> cnPurger = |
| | | new AtomicReference<ChangelogDBPurger>(); |
| | | private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>(); |
| | | private volatile long latestPurgeDate; |
| | | |
| | | /** The local replication server. */ |
| | |
| | | private File makeDir(String dbDirName) throws ConfigException |
| | | { |
| | | // Check that this path exists or create it. |
| | | File dbDirectory = getFileForPath(dbDirName); |
| | | final File dbDirectory = getFileForPath(dbDirName); |
| | | try |
| | | { |
| | | if (!dbDirectory.exists()) |
| | |
| | | * the baseDN for which to create a ReplicaDB |
| | | * @param serverId |
| | | * the serverId for which to create a ReplicaDB |
| | | * @param rs |
| | | * @param server |
| | | * the ReplicationServer |
| | | * @return a Pair with the JEReplicaDB and a boolean indicating whether it had |
| | | * to be created |
| | |
| | | * if a problem occurred with the database |
| | | */ |
| | | Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN, |
| | | int serverId, ReplicationServer rs) throws ChangelogException |
| | | int serverId, ReplicationServer server) throws ChangelogException |
| | | { |
| | | while (!shutdown.get()) |
| | | { |
| | | final ConcurrentMap<Integer, JEReplicaDB> domainMap = |
| | | getExistingOrNewDomainMap(baseDN); |
| | | final Pair<JEReplicaDB, Boolean> result = |
| | | getExistingOrNewReplicaDB(domainMap, serverId, baseDN, rs); |
| | | getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server); |
| | | if (result != null) |
| | | { |
| | | return result; |
| | |
| | | |
| | | private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB( |
| | | final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId, |
| | | DN baseDN, ReplicationServer rs) throws ChangelogException |
| | | DN baseDN, ReplicationServer server) throws ChangelogException |
| | | { |
| | | // happy path: the JEReplicaDB already exists |
| | | JEReplicaDB currentValue = domainMap.get(serverId); |
| | |
| | | |
| | | if (domainToReplicaDBs.get(baseDN) != domainMap) |
| | | { |
| | | // the domainMap could have been concurrently removed because |
| | | // The domainMap could have been concurrently removed because |
| | | // 1) a shutdown was initiated or 2) an initialize was called. |
| | | // Return will allow the code to: |
| | | // 1) shutdown properly or 2) lazily recreate the JEReplicaDB |
| | | return null; |
| | | } |
| | | |
| | | final JEReplicaDB newValue = new JEReplicaDB(serverId, baseDN, rs, dbEnv); |
| | | domainMap.put(serverId, newValue); |
| | | return Pair.of(newValue, true); |
| | | final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, dbEnv); |
| | | domainMap.put(serverId, newDB); |
| | | return Pair.of(newDB, true); |
| | | } |
| | | } |
| | | |
| | |
| | | final File dbDir = getFileForPath(config.getReplicationDBDirectory()); |
| | | dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer); |
| | | final ChangelogState changelogState = dbEnv.readChangelogState(); |
| | | initializeChangelogState(changelogState); |
| | | initializeToChangelogState(changelogState); |
| | | if (config.isComputeChangeNumber()) |
| | | { |
| | | startIndexer(changelogState); |
| | |
| | | catch (ChangelogException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | |
| | | logError(ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(), |
| | | e.getLocalizedMessage())); |
| | | } |
| | | logError(ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage())); |
| | | } |
| | | } |
| | | |
| | | private void initializeChangelogState(final ChangelogState changelogState) |
| | | private void initializeToChangelogState(final ChangelogState changelogState) |
| | | throws ChangelogException |
| | | { |
| | | for (Map.Entry<DN, Long> entry : |
| | | changelogState.getDomainToGenerationId().entrySet()) |
| | | for (Map.Entry<DN, Long> entry : changelogState.getDomainToGenerationId().entrySet()) |
| | | { |
| | | replicationServer.getReplicationServerDomain(entry.getKey(), true) |
| | | .initGenerationID(entry.getValue()); |
| | | replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue()); |
| | | } |
| | | for (Map.Entry<DN, List<Integer>> entry : |
| | | changelogState.getDomainToServerIds().entrySet()) |
| | | for (Map.Entry<DN, List<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | { |
| | | for (int serverId : entry.getValue()) |
| | | { |
| | |
| | | firstException = e; |
| | | } |
| | | else if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | |
| | | cnIndexDB = null; |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void setPurgeDelay(long purgeDelayInMillis) |
| | | public void setPurgeDelay(final long purgeDelayInMillis) |
| | | { |
| | | this.purgeDelayInMillis = purgeDelayInMillis; |
| | | final ChangelogDBPurger purger; |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void setComputeChangeNumber(boolean computeChangeNumber) |
| | | public void setComputeChangeNumber(final boolean computeChangeNumber) |
| | | throws ChangelogException |
| | | { |
| | | if (computeChangeNumber) |
| | |
| | | |
| | | private void startIndexer(final ChangelogState changelogState) |
| | | { |
| | | final ChangeNumberIndexer indexer = |
| | | new ChangeNumberIndexer(this, changelogState); |
| | | final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState); |
| | | if (cnIndexer.compareAndSet(null, indexer)) |
| | | { |
| | | indexer.start(); |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getDomainLatestTrimDate(DN baseDN) |
| | | public long getDomainLatestTrimDate(final DN baseDN) |
| | | { |
| | | return latestPurgeDate; |
| | | } |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, |
| | | ServerState startAfterServerState) throws ChangelogException |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState) |
| | | throws ChangelogException |
| | | { |
| | | final Set<Integer> serverIds = getDomainMap(baseDN).keySet(); |
| | | final Map<DBCursor<UpdateMsg>, Void> cursors = |
| | | new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size()); |
| | | final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size()); |
| | | for (int serverId : serverIds) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterServerState != null ? |
| | | startAfterServerState.getCSN(serverId) : null; |
| | | final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; |
| | | cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null); |
| | | } |
| | | // recycle exhausted cursors, |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, |
| | | CSN startAfterCSN) throws ChangelogException |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN) |
| | | throws ChangelogException |
| | | { |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN) throws ChangelogException |
| | | public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException |
| | | { |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void notifyReplicaOffline(DN baseDN, CSN offlineCSN) |
| | | throws ChangelogException |
| | | public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException |
| | | { |
| | | dbEnv.notifyReplicaOffline(baseDN, offlineCSN); |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | |
| | | |
| | | /** |
| | | * The thread purging the changelogDB on a regular interval. Records are |
| | | * purged from the changelogDB is they are older than a delay specified in |
| | | * purged from the changelogDB if they are older than a delay specified in |
| | | * seconds. The purge process works in two steps: |
| | | * <ol> |
| | | * <li>first purge the changeNumberIndexDB and retrieve information to drive |
| | |
| | | } |
| | | } |
| | | |
| | | for (final Map<Integer, JEReplicaDB> domainMap |
| | | : domainToReplicaDBs.values()) |
| | | for (final Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values()) |
| | | { |
| | | for (final JEReplicaDB replicaDB : domainMap.values()) |
| | | { |
| | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH |
| | | .get(stackTraceToSingleLineString(e))); |
| | | logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get(stackTraceToSingleLineString(e))); |
| | | if (replicationServer != null) |
| | | { |
| | | replicationServer.shutdown(); |