| | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | /** |
| | | * This class is used for managing the replicationServer database for each |
| | | * server in the topology. |
| | | * Represents a replication server database for one server in the topology. |
| | | * <p> |
| | | * It is responsible for efficiently saving the updates that is received from |
| | | * each master server into stable storage. |
| | | * <p> |
| | | * This class is also able to generate a {@link DBCursor} that can be used to |
| | | * It is also able to generate a {@link DBCursor} that can be used to |
| | | * read all changes from a given {@link CSN}. |
| | | * <p> |
| | | * This class publish some monitoring information below cn=monitor. |
| | | * It publishes some monitoring information below cn=monitor. |
| | | */ |
| | | public class JEReplicaDB |
| | | class JEReplicaDB |
| | | { |
| | | |
| | | /** |
| | |
| | | this.oldestCSN = oldestCSN; |
| | | this.newestCSN = newestCSN; |
| | | } |
| | | |
| | | } |
| | | |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(false); |
| | | private ReplicationDB db; |
| | | /** |
| | | * Holds the oldest and newest CSNs for this replicaDB for fast retrieval. |
| | | * |
| | | * @NonNull |
| | | */ |
| | | private volatile CSNLimits csnLimits; |
| | | private int serverId; |
| | | private DN baseDN; |
| | | private DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | | private ReplicationServer replicationServer; |
| | | private final int serverId; |
| | | private final DN baseDN; |
| | | private final DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | | private final ReplicationServer replicationServer; |
| | | private final ReplicationDB db; |
| | | |
| | | /** |
| | | * Creates a new ReplicaDB associated to a given LDAP server. |
| | | * |
| | | * @param serverId The serverId for which changes will be stored in the DB. |
| | | * @param baseDN the baseDN for which this DB was created. |
| | | * @param replicationServer The ReplicationServer that creates this ReplicaDB. |
| | | * @param dbenv the Database Env to use to create the ReplicationServer DB. |
| | | * server for this domain. |
| | | * @throws ChangelogException If a database problem happened |
| | | * @param serverId |
| | | * Id of this server. |
| | | * @param baseDN |
| | | * the replication domain baseDN. |
| | | * @param replicationServer |
| | | * The ReplicationServer that creates this ReplicaDB. |
| | | * @param replicationEnv |
| | | * the Database Env to use to create the ReplicationServer DB. server |
| | | * for this domain. |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | public JEReplicaDB(int serverId, DN baseDN, |
| | | ReplicationServer replicationServer, ReplicationDbEnv dbenv) |
| | | throws ChangelogException |
| | | JEReplicaDB(final int serverId, final DN baseDN, final ReplicationServer replicationServer, |
| | | final ReplicationDbEnv replicationEnv) throws ChangelogException |
| | | { |
| | | this.replicationServer = replicationServer; |
| | | this.serverId = serverId; |
| | | this.baseDN = baseDN; |
| | | db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv); |
| | | csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN()); |
| | | this.replicationServer = replicationServer; |
| | | this.db = new ReplicationDB(serverId, baseDN, replicationServer, replicationEnv); |
| | | this.csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN()); |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | | DirectoryServer.registerMonitorProvider(dbMonitor); |
| | |
| | | db.addEntry(updateMsg); |
| | | |
| | | final CSNLimits limits = csnLimits; |
| | | final boolean updateNew = limits.newestCSN == null |
| | | || limits.newestCSN.isOlderThan(updateMsg.getCSN()); |
| | | final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN()); |
| | | final boolean updateOld = limits.oldestCSN == null; |
| | | if (updateOld || updateNew) |
| | | { |
| | |
| | | * |
| | | * @return the oldest CSN that has not been purged yet. |
| | | */ |
| | | public CSN getOldestCSN() |
| | | CSN getOldestCSN() |
| | | { |
| | | return csnLimits.oldestCSN; |
| | | } |
| | |
| | | * |
| | | * @return the newest CSN that has not been purged yet. |
| | | */ |
| | | public CSN getNewestCSN() |
| | | CSN getNewestCSN() |
| | | { |
| | | return csnLimits.newestCSN; |
| | | } |
| | |
| | | * @throws ChangelogException |
| | | * if a database problem happened |
| | | */ |
| | | public DBCursor<UpdateMsg> generateCursorFrom(CSN startCSN, PositionStrategy positionStrategy) |
| | | DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | { |
| | | return new JEReplicaDBCursor(db, startCSN, positionStrategy, this); |
| | |
| | | /** |
| | | * Shutdown this ReplicaDB. |
| | | */ |
| | | public void shutdown() |
| | | void shutdown() |
| | | { |
| | | if (shutdown.compareAndSet(false, true)) |
| | | { |
| | |
| | | * |
| | | * @param purgeCSN |
| | | * The CSN up to which changes can be purged. No purging happens when |
| | | * it is null. |
| | | * it is {@code null}. |
| | | * @throws ChangelogException |
| | | * In case of database problem. |
| | | */ |
| | |
| | | } |
| | | |
| | | /** |
| | | * This internal class is used to implement the Monitoring capabilities of the |
| | | * ReplicaDB. |
| | | * Implements monitoring capabilities of the ReplicaDB. |
| | | */ |
| | | private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg> |
| | | { |
| | |
| | | @Override |
| | | public List<Attribute> getMonitorData() |
| | | { |
| | | List<Attribute> attributes = new ArrayList<Attribute>(); |
| | | final List<Attribute> attributes = new ArrayList<Attribute>(); |
| | | create(attributes, "replicationServer-database",String.valueOf(serverId)); |
| | | create(attributes, "domain-name", baseDN.toNormalizedString()); |
| | | final CSNLimits limits = csnLimits; |
| | |
| | | return attributes; |
| | | } |
| | | |
| | | private void create(List<Attribute> attributes, String name, String value) |
| | | private void create(final List<Attribute> attributes, final String name, final String value) |
| | | { |
| | | attributes.add(Attributes.create(name, value)); |
| | | } |
| | | |
| | | private String encode(CSN csn) |
| | | private String encode(final CSN csn) |
| | | { |
| | | return csn + " " + new Date(csn.getTime()); |
| | | } |
| | |
| | | @Override |
| | | public String getMonitorInstanceName() |
| | | { |
| | | ReplicationServerDomain domain = replicationServer |
| | | .getReplicationServerDomain(baseDN); |
| | | return "Changelog for DS(" + serverId + "),cn=" |
| | | + domain.getMonitorInstanceName(); |
| | | ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN); |
| | | return "Changelog for DS(" + serverId + "),cn=" + domain.getMonitorInstanceName(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void initializeMonitorProvider(MonitorProviderCfg configuration) |
| | | throws ConfigException,InitializationException |
| | | throws ConfigException,InitializationException |
| | | { |
| | | // Nothing to do for now |
| | | } |
| | |
| | | * @throws ChangelogException When an exception occurs while removing the |
| | | * changes from the DB. |
| | | */ |
| | | public void clear() throws ChangelogException |
| | | void clear() throws ChangelogException |
| | | { |
| | | db.clear(); |
| | | csnLimits = new CSNLimits(null, null); |