| | |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.admin.std.server.ReplicationServerCfg; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.CSN; |
| | |
| | | domainToReplicaDBs = |
| | | new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>(); |
| | | private ReplicationDbEnv dbEnv; |
| | | private final String dbDirectoryName; |
| | | private ReplicationServerCfg config; |
| | | private final File dbDirectory; |
| | | |
| | | /** |
| | |
| | | * Guarded by cnIndexDBLock |
| | | */ |
| | | private JEChangeNumberIndexDB cnIndexDB; |
| | | private final AtomicReference<ChangeNumberIndexer> cnIndexer = |
| | | new AtomicReference<ChangeNumberIndexer>(); |
| | | |
| | | /** Used for protecting {@link ChangeNumberIndexDB} related state. */ |
| | | private final Object cnIndexDBLock = new Object(); |
| | |
| | | * |
| | | * @param replicationServer |
| | | * the local replication server. |
| | | * @param dbDirName |
| | | * the directory for use by the replication database |
| | | * @param config |
| | | * the replication server configuration |
| | | * @throws ConfigException |
| | | * if a problem occurs opening the supplied directory |
| | | */ |
| | | public JEChangelogDB(ReplicationServer replicationServer, String dbDirName) |
| | | throws ConfigException |
| | | public JEChangelogDB(ReplicationServer replicationServer, |
| | | ReplicationServerCfg config) throws ConfigException |
| | | { |
| | | this.config = config; |
| | | this.replicationServer = replicationServer; |
| | | this.dbDirectoryName = dbDirName != null ? dbDirName : "changelogDb"; |
| | | this.dbDirectory = makeDir(this.dbDirectoryName); |
| | | this.dbDirectory = makeDir(config.getReplicationDBDirectory()); |
| | | } |
| | | |
| | | private File makeDir(String dbDirName) throws ConfigException |
| | |
| | | { |
| | | try |
| | | { |
| | | dbEnv = new ReplicationDbEnv( |
| | | getFileForPath(dbDirectoryName).getAbsolutePath(), replicationServer); |
| | | initializeChangelogState(dbEnv.readChangelogState()); |
| | | final File dbDir = getFileForPath(config.getReplicationDBDirectory()); |
| | | dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer); |
| | | final ChangelogState changelogState = dbEnv.readChangelogState(); |
| | | initializeChangelogState(changelogState); |
| | | if (config.isComputeChangenumber()) |
| | | { |
| | | final ChangeNumberIndexer indexer = |
| | | new ChangeNumberIndexer(this, changelogState); |
| | | if (cnIndexer.compareAndSet(null, indexer)) |
| | | { |
| | | indexer.start(); |
| | | } |
| | | } |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | // - then throw the first encountered exception |
| | | ChangelogException firstException = null; |
| | | |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | indexer.initiateShutdown(); |
| | | cnIndexer.compareAndSet(indexer, null); |
| | | } |
| | | try |
| | | { |
| | | shutdownCNIndexDB(); |
| | |
| | | // - then throw the first encountered exception |
| | | ChangelogException firstException = null; |
| | | |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | indexer.clear(); |
| | | } |
| | | |
| | | for (DN baseDN : this.domainToReplicaDBs.keySet()) |
| | | { |
| | | removeDomain(baseDN); |
| | |
| | | @Override |
| | | public void setPurgeDelay(long delay) |
| | | { |
| | | final JEChangeNumberIndexDB cnIndexDB = this.cnIndexDB; |
| | | if (cnIndexDB != null) |
| | | { |
| | | cnIndexDB.setPurgeDelay(delay); |
| | | } |
| | | for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values()) |
| | | { |
| | | for (JEReplicaDB replicaDB : domainMap.values()) |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void setComputeChangeNumber(boolean computeChangeNumber) |
| | | throws ChangelogException |
| | | { |
| | | final ChangeNumberIndexer indexer; |
| | | if (computeChangeNumber) |
| | | { |
| | | final ChangelogState changelogState = dbEnv.readChangelogState(); |
| | | indexer = new ChangeNumberIndexer(this, changelogState); |
| | | if (cnIndexer.compareAndSet(null, indexer)) |
| | | { |
| | | indexer.start(); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | indexer = cnIndexer.getAndSet(null); |
| | | if (indexer != null) |
| | | { |
| | | indexer.initiateShutdown(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getDomainLatestTrimDate(DN baseDN) |
| | | { |
| | | long latest = 0; |
| | |
| | | for (int serverId : serverIds) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterServerState.getCSN(serverId); |
| | | final CSN lastCSN = startAfterServerState != null ? |
| | | startAfterServerState.getCSN(serverId) : null; |
| | | cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null); |
| | | } |
| | | return new CompositeDBCursor<Void>(cursors); |
| | |
| | | final boolean wasCreated = pair.getSecond(); |
| | | |
| | | replicaDB.add(updateMsg); |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | indexer.publishUpdateMsg(baseDN, updateMsg); |
| | | } |
| | | return wasCreated; |
| | | } |
| | | |