| | |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.file.LogFile.LogCursor; |
| | | import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.ByteString; |
| | |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(false); |
| | | |
| | | /** The log in which records are persisted. */ |
| | | private final LogFile<CSN, UpdateMsg> logFile; |
| | | private final Log<CSN, UpdateMsg> log; |
| | | |
| | | /** |
| | | * Holds the oldest and newest CSNs for this replicaDB for fast retrieval. |
| | |
| | | this.baseDN = baseDN; |
| | | this.replicationServer = replicationServer; |
| | | this.replicationEnv = replicationEnv; |
| | | this.logFile = createLogFile(replicationEnv); |
| | | this.log = createLog(replicationEnv); |
| | | this.csnLimits = new CSNLimits(readOldestCSN(), readNewestCSN()); |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | |
| | | |
| | | private CSN readOldestCSN() throws ChangelogException |
| | | { |
| | | final Record<CSN, UpdateMsg> record = logFile.getOldestRecord(); |
| | | final Record<CSN, UpdateMsg> record = log.getOldestRecord(); |
| | | return record == null ? null : record.getKey(); |
| | | } |
| | | |
| | | private CSN readNewestCSN() throws ChangelogException |
| | | { |
| | | final Record<CSN, UpdateMsg> record = logFile.getNewestRecord(); |
| | | final Record<CSN, UpdateMsg> record = log.getNewestRecord(); |
| | | return record == null ? null : record.getKey(); |
| | | } |
| | | |
| | | private LogFile<CSN, UpdateMsg> createLogFile(final ReplicationEnvironment replicationEnv) throws ChangelogException |
| | | private Log<CSN, UpdateMsg> createLog(final ReplicationEnvironment replicationEnv) throws ChangelogException |
| | | { |
| | | final ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN, true); |
| | | return replicationEnv.getOrCreateReplicaDB(baseDN, serverId, domain.getGenerationId()); |
| | |
| | | ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg |
| | | .toString(), String.valueOf(baseDN), String.valueOf(serverId))); |
| | | } |
| | | logFile.addRecord(Record.from(updateMsg.getCSN(), updateMsg)); |
| | | log.append(Record.from(updateMsg.getCSN(), updateMsg)); |
| | | final CSNLimits limits = csnLimits; |
| | | final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN()); |
| | | final boolean updateOld = limits.oldestCSN == null; |
| | |
| | | */ |
| | | DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN) throws ChangelogException |
| | | { |
| | | LogCursor<CSN, UpdateMsg> cursor = logFile.getNearestCursor(startAfterCSN); |
| | | RepositionableCursor<CSN, UpdateMsg> cursor = log.getNearestCursor(startAfterCSN); |
| | | return new FileReplicaDBCursor(cursor, startAfterCSN); |
| | | } |
| | | |
| | |
| | | { |
| | | if (shutdown.compareAndSet(false, true)) |
| | | { |
| | | logFile.close(); |
| | | log.close(); |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | | } |
| | | } |
| | |
| | | { |
| | | return; |
| | | } |
| | | |
| | | // TODO : no purge implemented yet, as we have a single-file log. |
| | | // The purge must be implemented once we handle a log with multiple files. |
| | | // The purge will only delete whole files. |
| | | final Record<CSN, UpdateMsg> oldestRecord = log.purgeUpTo(purgeCSN); |
| | | if (oldestRecord != null) |
| | | { |
| | | csnLimits = new CSNLimits(oldestRecord.getKey(), csnLimits.newestCSN); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | void clear() throws ChangelogException |
| | | { |
| | | // Remove all persisted data and reset generationId to default value |
| | | logFile.clear(); |
| | | log.clear(); |
| | | replicationEnv.resetGenerationId(baseDN); |
| | | |
| | | csnLimits = new CSNLimits(null, null); |
| | |
| | | */ |
| | | long getNumberRecords() throws ChangelogException |
| | | { |
| | | return logFile.getNumberOfRecords(); |
| | | return log.getNumberOfRecords(); |
| | | } |
| | | |
| | | /** Parser of records persisted in the ReplicaDB log. */ |
| | |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | @Override |
| | | public ByteString encodeRecord(CSN key, UpdateMsg message) |
| | | public ByteString encodeRecord(final Record<CSN, UpdateMsg> record) |
| | | { |
| | | final UpdateMsg message = record.getValue(); |
| | | try |
| | | { |
| | | return ByteString.wrap(message.getBytes()); |
| | |
| | | throw new DecodingException(e); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public CSN decodeKeyFromString(String key) throws ChangelogException |
| | | { |
| | | return new CSN(key); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String encodeKeyToString(CSN key) |
| | | { |
| | | return key.toString(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public CSN getMaxKey() |
| | | { |
| | | return CSN.MAX_CSN_VALUE; |
| | | } |
| | | } |
| | | |
| | | } |