mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
21.56.2014 84cecd3711ddbbd60132cdd80957e387f23cf63e
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -43,7 +43,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.file.LogFile.LogCursor;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.ByteString;
@@ -92,7 +92,7 @@
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
  /** The log in which records are persisted. */
  private final LogFile<CSN, UpdateMsg> logFile;
  private final Log<CSN, UpdateMsg> log;
  /**
   * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
@@ -133,7 +133,7 @@
    this.baseDN = baseDN;
    this.replicationServer = replicationServer;
    this.replicationEnv = replicationEnv;
    this.logFile = createLogFile(replicationEnv);
    this.log = createLog(replicationEnv);
    this.csnLimits = new CSNLimits(readOldestCSN(), readNewestCSN());
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -142,17 +142,17 @@
  private CSN readOldestCSN() throws ChangelogException
  {
    final Record<CSN, UpdateMsg> record = logFile.getOldestRecord();
    final Record<CSN, UpdateMsg> record = log.getOldestRecord();
    return record == null ? null : record.getKey();
  }
  private CSN readNewestCSN() throws ChangelogException
  {
    final Record<CSN, UpdateMsg> record = logFile.getNewestRecord();
    final Record<CSN, UpdateMsg> record = log.getNewestRecord();
    return record == null ? null : record.getKey();
  }
  private LogFile<CSN, UpdateMsg> createLogFile(final ReplicationEnvironment replicationEnv) throws ChangelogException
  private Log<CSN, UpdateMsg> createLog(final ReplicationEnvironment replicationEnv) throws ChangelogException
  {
    final ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN, true);
    return replicationEnv.getOrCreateReplicaDB(baseDN, serverId, domain.getGenerationId());
@@ -174,7 +174,7 @@
          ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
              .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
    }
    logFile.addRecord(Record.from(updateMsg.getCSN(), updateMsg));
    log.append(Record.from(updateMsg.getCSN(), updateMsg));
    final CSNLimits limits = csnLimits;
    final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN());
    final boolean updateOld = limits.oldestCSN == null;
@@ -239,7 +239,7 @@
   */
  DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN) throws ChangelogException
  {
    LogCursor<CSN, UpdateMsg> cursor = logFile.getNearestCursor(startAfterCSN);
    RepositionableCursor<CSN, UpdateMsg> cursor = log.getNearestCursor(startAfterCSN);
    return new FileReplicaDBCursor(cursor, startAfterCSN);
  }
@@ -250,7 +250,7 @@
  {
    if (shutdown.compareAndSet(false, true))
    {
      logFile.close();
      log.close();
      DirectoryServer.deregisterMonitorProvider(dbMonitor);
    }
  }
@@ -270,10 +270,11 @@
    {
      return;
    }
    // TODO : no purge implemented yet, as we have a single-file log.
    // The purge must be implemented once we handle a log with multiple files.
    // The purge will only delete whole files.
    final Record<CSN, UpdateMsg> oldestRecord = log.purgeUpTo(purgeCSN);
    if (oldestRecord != null)
    {
      csnLimits = new CSNLimits(oldestRecord.getKey(), csnLimits.newestCSN);
    }
  }
  /**
@@ -346,7 +347,7 @@
  void clear() throws ChangelogException
  {
    // Remove all persisted data and reset generationId to default value
    logFile.clear();
    log.clear();
    replicationEnv.resetGenerationId(baseDN);
    csnLimits = new CSNLimits(null, null);
@@ -363,7 +364,7 @@
   */
  long getNumberRecords() throws ChangelogException
  {
    return logFile.getNumberOfRecords();
    return log.getNumberOfRecords();
  }
  /** Parser of records persisted in the ReplicaDB log. */
@@ -372,8 +373,9 @@
    private static final DebugTracer TRACER = getTracer();
    @Override
    public ByteString encodeRecord(CSN key, UpdateMsg message)
    public ByteString encodeRecord(final Record<CSN, UpdateMsg> record)
    {
      final UpdateMsg message = record.getValue();
      try
      {
        return ByteString.wrap(message.getBytes());
@@ -400,6 +402,27 @@
        throw new DecodingException(e);
      }
    }
    /** {@inheritDoc} */
    @Override
    public CSN decodeKeyFromString(String key) throws ChangelogException
    {
      return new CSN(key);
    }
    /** {@inheritDoc} */
    @Override
    public String encodeKeyToString(CSN key)
    {
      return key.toString();
    }
    /** {@inheritDoc} */
    @Override
    public CSN getMaxKey()
    {
      return CSN.MAX_CSN_VALUE;
    }
  }
}