| | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.ProtocolVersion; |
| | | import org.opends.server.replication.protocol.ReplicationMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | |
| | | * <p> |
| | | * This is the only class that should have code using the BDB interfaces. |
| | | */ |
| | | public class ReplicationDB |
| | | class ReplicationDB |
| | | { |
| | | |
| | | private Database db; |
| | | private ReplicationDbEnv dbEnv; |
| | | private ReplicationServer replicationServer; |
| | | private int serverId; |
| | | private DN baseDN; |
| | | private final ReplicationDbEnv dbEnv; |
| | | private final ReplicationServer replicationServer; |
| | | private final int serverId; |
| | | private final DN baseDN; |
| | | |
| | | /** |
| | | * The lock used to provide exclusive access to the thread that close the db |
| | |
| | | * @param dbEnv The Db environment to use to create the db. |
| | | * @throws ChangelogException If a database problem happened |
| | | */ |
| | | public ReplicationDB(int serverId, DN baseDN, |
| | | ReplicationDB(int serverId, DN baseDN, |
| | | ReplicationServer replicationServer, ReplicationDbEnv dbEnv) |
| | | throws ChangelogException |
| | | { |
| | |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | public void addEntry(UpdateMsg change) throws ChangelogException |
| | | void addEntry(UpdateMsg change) throws ChangelogException |
| | | { |
| | | dbCloseLock.readLock().lock(); |
| | | try |
| | |
| | | } |
| | | |
| | | final DatabaseEntry key = createReplicationKey(change.getCSN()); |
| | | final DatabaseEntry data = new ReplicationData(change); |
| | | // Always keep messages in the replication DB with the current protocol |
| | | // version |
| | | final DatabaseEntry data = new DatabaseEntry(change.getBytes()); |
| | | |
| | | insertCounterRecordIfNeeded(change.getCSN()); |
| | | db.put(null, key, data); |
| | |
| | | /** |
| | | * Shutdown the database. |
| | | */ |
| | | public void shutdown() |
| | | void shutdown() |
| | | { |
| | | dbCloseLock.writeLock().lock(); |
| | | try |
| | |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | public ReplServerDBCursor openReadCursor(CSN startCSN) |
| | | throws ChangelogException |
| | | ReplServerDBCursor openReadCursor(CSN startCSN) throws ChangelogException |
| | | { |
| | | return new ReplServerDBCursor(startCSN); |
| | | } |
| | |
| | | * |
| | | * @return The ReplServerDBCursor. |
| | | */ |
| | | public ReplServerDBCursor openDeleteCursor() throws ChangelogException |
| | | ReplServerDBCursor openDeleteCursor() throws ChangelogException |
| | | { |
| | | return new ReplServerDBCursor(); |
| | | } |
| | |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | public CSN readOldestCSN() throws ChangelogException |
| | | CSN readOldestCSN() throws ChangelogException |
| | | { |
| | | dbCloseLock.readLock().lock(); |
| | | |
| | |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | public CSN readNewestCSN() throws ChangelogException |
| | | CSN readNewestCSN() throws ChangelogException |
| | | { |
| | | dbCloseLock.readLock().lock(); |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Try to find in the DB, the CSN right before the one passed as a parameter. |
| | | * |
| | | * @param csn |
| | | * The CSN from which we start searching. |
| | | * @return the CSN right before the one passed as a parameter. Can return null |
| | | * if there is none. |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | public CSN getPreviousCSN(CSN csn) throws ChangelogException |
| | | { |
| | | if (csn == null) |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | dbCloseLock.readLock().lock(); |
| | | |
| | | Cursor cursor = null; |
| | | try |
| | | { |
| | | // If the DB has been closed then return immediately. |
| | | if (isDBClosed()) |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | DatabaseEntry key = createReplicationKey(csn); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | cursor = db.openCursor(null, null); |
| | | if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) == SUCCESS) |
| | | { |
| | | // We can move close to the CSN. |
| | | // Let's move to the previous change. |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS) |
| | | { |
| | | return getRegularRecord(cursor, key, data); |
| | | } |
| | | // else, there was no change previous to our CSN. |
| | | } |
| | | else |
| | | { |
| | | // We could not move the cursor past to the CSN |
| | | // Check if the last change is older than CSN |
| | | if (cursor.getLast(key, data, LockMode.DEFAULT) == SUCCESS) |
| | | { |
| | | return getRegularRecord(cursor, key, data); |
| | | } |
| | | } |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | throw new ChangelogException(e); |
| | | } |
| | | finally |
| | | { |
| | | closeAndReleaseReadLock(cursor); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | private CSN getRegularRecord(Cursor cursor, DatabaseEntry key, |
| | | DatabaseEntry data) throws DatabaseException |
| | | { |
| | | final CSN csn = toCSN(key.getData()); |
| | | if (!isACounterRecord(csn)) |
| | | { |
| | | return csn; |
| | | } |
| | | |
| | | // There cannot be 2 counter record next to each other, |
| | | // it is safe to return previous record which must exist |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS) |
| | | { |
| | | return toCSN(key.getData()); |
| | | } |
| | | |
| | | // database only contain a counter record, which should not be possible |
| | | // let's just say no CSN |
| | | return null; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | |
| | | * This Class implements a cursor that can be used to browse a |
| | | * replicationServer database. |
| | | */ |
| | | public class ReplServerDBCursor implements Closeable |
| | | class ReplServerDBCursor implements Closeable |
| | | { |
| | | /** |
| | | * The transaction that will protect the actions done with the cursor. |
| | |
| | | * (per the Cursor documentation). |
| | | * This should not be used in any other case. |
| | | */ |
| | | public void abort() |
| | | void abort() |
| | | { |
| | | synchronized (this) |
| | | { |
| | |
| | | * @throws ChangelogException |
| | | * In case of underlying database problem. |
| | | */ |
| | | public CSN nextCSN() throws ChangelogException |
| | | CSN nextCSN() throws ChangelogException |
| | | { |
| | | if (isClosed) |
| | | { |
| | |
| | | * |
| | | * @return the next UpdateMsg. |
| | | */ |
| | | public UpdateMsg next() |
| | | UpdateMsg next() |
| | | { |
| | | if (isClosed) |
| | | { |
| | |
| | | { |
| | | continue; |
| | | } |
| | | currentChange = ReplicationData.generateChange(data.getData()); |
| | | currentChange = (UpdateMsg) ReplicationMsg.generateMsg( |
| | | data.getData(), ProtocolVersion.getCurrentVersion()); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | */ |
| | | Message message = ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD |
| | | .get(replicationServer.getServerId(), |
| | | (csn == null ? "" : csn.toString()), |
| | | (csn != null ? csn.toString() : ""), |
| | | e.getMessage()); |
| | | logError(message); |
| | | } |
| | |
| | | * |
| | | * @throws ChangelogException In case of database problem. |
| | | */ |
| | | public void delete() throws ChangelogException |
| | | void delete() throws ChangelogException |
| | | { |
| | | if (isClosed) |
| | | { |
| | |
| | | * |
| | | * @throws ChangelogException In case of database problem. |
| | | */ |
| | | public void clear() throws ChangelogException |
| | | void clear() throws ChangelogException |
| | | { |
| | | // The coming users will be blocked until the clear is done |
| | | dbCloseLock.writeLock().lock(); |
| | |
| | | * Encode the provided counter value in a database entry. |
| | | * @return The database entry with the counter value encoded inside. |
| | | */ |
| | | static private DatabaseEntry encodeCounterValue(int value) |
| | | private static DatabaseEntry encodeCounterValue(int value) |
| | | { |
| | | DatabaseEntry entry = new DatabaseEntry(); |
| | | entry.setData(getBytes(String.valueOf(value))); |