| | |
| | | import java.util.List; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | |
| | | import org.opends.server.admin.std.server.MonitorProviderCfg; |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.config.ConfigException; |
| | |
| | | import org.opends.server.replication.server.changelog.api.*; |
| | | import org.opends.server.types.*; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | | /** |
| | | * Logfile-based implementation of a ChangeNumberIndexDB. |
| | | * Implementation of a ChangeNumberIndexDB with a log. |
| | | * <p> |
| | | * This class publishes some monitoring information below <code> |
| | | * cn=monitor</code>. |
| | |
| | | static final RecordParser<Long, ChangeNumberIndexRecord> RECORD_PARSER = new ChangeNumberIndexDBParser(); |
| | | |
| | | /** The log in which records are persisted. */ |
| | | private final LogFile<Long, ChangeNumberIndexRecord> logFile; |
| | | private final Log<Long, ChangeNumberIndexRecord> log; |
| | | |
| | | /** |
| | | * The newest changenumber stored in the DB. It is used to avoid purging the |
| | |
| | | */ |
| | | FileChangeNumberIndexDB(ReplicationEnvironment replicationEnv) throws ChangelogException |
| | | { |
| | | logFile = replicationEnv.getOrCreateCNIndexDB(); |
| | | log = replicationEnv.getOrCreateCNIndexDB(); |
| | | final ChangeNumberIndexRecord newestRecord = readLastRecord(); |
| | | newestChangeNumber = getChangeNumber(newestRecord); |
| | | // initialization of the lastGeneratedChangeNumber from the DB content |
| | |
| | | |
| | | private ChangeNumberIndexRecord readLastRecord() throws ChangelogException |
| | | { |
| | | final Record<Long, ChangeNumberIndexRecord> record = logFile.getNewestRecord(); |
| | | final Record<Long, ChangeNumberIndexRecord> record = log.getNewestRecord(); |
| | | return record == null ? null : record.getValue(); |
| | | } |
| | | |
| | | private ChangeNumberIndexRecord readFirstRecord() throws ChangelogException |
| | | { |
| | | final Record<Long, ChangeNumberIndexRecord> record = logFile.getOldestRecord(); |
| | | final Record<Long, ChangeNumberIndexRecord> record = log.getOldestRecord(); |
| | | return record == null ? null : record.getValue(); |
| | | } |
| | | |
| | |
| | | final long changeNumber = nextChangeNumber(); |
| | | final ChangeNumberIndexRecord newRecord = |
| | | new ChangeNumberIndexRecord(changeNumber, record.getPreviousCookie(), record.getBaseDN(), record.getCSN()); |
| | | logFile.addRecord(newRecord.getChangeNumber(), newRecord); |
| | | log.append(Record.from(newRecord.getChangeNumber(), newRecord)); |
| | | newestChangeNumber = changeNumber; |
| | | |
| | | if (debugEnabled()) |
| | |
| | | */ |
| | | long count() throws ChangelogException |
| | | { |
| | | return logFile.getNumberOfRecords(); |
| | | return log.getNumberOfRecords(); |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public DBCursor<ChangeNumberIndexRecord> getCursorFrom(final long startChangeNumber) throws ChangelogException |
| | | { |
| | | return new FileChangeNumberIndexDBCursor(logFile.getCursor(startChangeNumber)); |
| | | return new FileChangeNumberIndexDBCursor(log.getCursor(startChangeNumber)); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | if (shutdown.compareAndSet(false, true)) |
| | | { |
| | | logFile.close(); |
| | | log.close(); |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | | } |
| | | } |
| | |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | // TODO : no purge implemented yet as implementation is based on a single-file log. |
| | | // The purge must be implemented once we handle a log with multiple files. |
| | | // The purge will only delete whole files. |
| | | return null; |
| | | final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeCSN.getTime()); |
| | | return record != null ? record.getValue().getCSN() : null; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void clear() throws ChangelogException |
| | | { |
| | | logFile.clear(); |
| | | log.clear(); |
| | | newestChangeNumber = NO_KEY; |
| | | } |
| | | |
| | |
| | | private static final byte STRING_SEPARATOR = 0; |
| | | |
| | | @Override |
| | | public ByteString encodeRecord(final Long changeNumber, final ChangeNumberIndexRecord record) |
| | | public ByteString encodeRecord(final Record<Long, ChangeNumberIndexRecord> record) |
| | | { |
| | | final ChangeNumberIndexRecord cnIndexRecord = record.getValue(); |
| | | return new ByteStringBuilder() |
| | | .append(changeNumber) |
| | | .append(record.getPreviousCookie()) |
| | | .append(record.getKey()) |
| | | .append(cnIndexRecord.getPreviousCookie()) |
| | | .append(STRING_SEPARATOR) |
| | | .append(record.getBaseDN().toString()) |
| | | .append(cnIndexRecord.getBaseDN().toString()) |
| | | .append(STRING_SEPARATOR) |
| | | .append(record.getCSN().toByteString()).toByteString(); |
| | | .append(cnIndexRecord.getCSN().toByteString()).toByteString(); |
| | | } |
| | | |
| | | @Override |
| | |
| | | } |
| | | return length; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Long decodeKeyFromString(String key) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | return Long.valueOf(key); |
| | | } |
| | | catch (NumberFormatException e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_DECODE_KEY_FROM_STRING.get(key), e); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String encodeKeyToString(Long key) |
| | | { |
| | | return key.toString(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Long getMaxKey() |
| | | { |
| | | return Long.MAX_VALUE; |
| | | } |
| | | } |
| | | |
| | | } |