| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.io.Closeable; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.concurrent.locks.ReadWriteLock; |
| | |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | |
| | | |
| | | private DatabaseEntry createReplicationKey(CSN csn) |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | try |
| | | final DatabaseEntry key = new DatabaseEntry(); |
| | | if (csn != null) |
| | | { |
| | | key.setData(csn.toString().getBytes("UTF-8")); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // Should never happens, UTF-8 is always supported |
| | | // TODO : add better logging |
| | | try |
| | | { |
| | | key.setData(csn.toString().getBytes("UTF-8")); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // Should never happens, UTF-8 is always supported |
| | | // TODO : add better logging |
| | | } |
| | | } |
| | | return key; |
| | | } |
| | |
| | | * @param startCSN |
| | | * The CSN from which the cursor must start.If null, start from the |
| | | * oldest CSN |
| | | * @param positionStrategy |
| | | * indicates at which exact position the cursor must start |
| | | * @return The ReplServerDBCursor. |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | ReplServerDBCursor openReadCursor(CSN startCSN) throws ChangelogException |
| | | ReplServerDBCursor openReadCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | return new ReplServerDBCursor(startCSN); |
| | | return new ReplServerDBCursor(startCSN, positionStrategy); |
| | | } |
| | | |
| | | /** |
| | |
| | | * This Class implements a cursor that can be used to browse a |
| | | * replicationServer database. |
| | | */ |
| | | class ReplServerDBCursor implements Closeable |
| | | class ReplServerDBCursor implements DBCursor<UpdateMsg> |
| | | { |
| | | /** |
| | | * The transaction that will protect the actions done with the cursor. |
| | |
| | | * <p> |
| | | * Will be set non null for a write cursor |
| | | */ |
| | | private final Transaction txn; |
| | | private final Cursor cursor; |
| | | private final DatabaseEntry key; |
| | | private final DatabaseEntry data; |
| | | /** \@Null for read cursors, \@NotNull for deleting cursors. */ |
| | | private final Transaction txn; |
| | | private UpdateMsg currentRecord; |
| | | |
| | | private boolean isClosed = false; |
| | | private boolean isClosed; |
| | | |
| | | /** |
| | | * Creates a ReplServerDBCursor that can be used for browsing a |
| | |
| | | * |
| | | * @param startCSN |
| | | * The CSN from which the cursor must start. |
| | | * @param positionStrategy |
| | | * indicates at which exact position the cursor must start |
| | | * @throws ChangelogException |
| | | * When the startCSN does not exist. |
| | | */ |
| | | private ReplServerDBCursor(CSN startCSN) throws ChangelogException |
| | | private ReplServerDBCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | if (startCSN != null) |
| | | { |
| | | key = createReplicationKey(startCSN); |
| | | } |
| | | else |
| | | { |
| | | key = new DatabaseEntry(); |
| | | } |
| | | key = createReplicationKey(startCSN); |
| | | data = new DatabaseEntry(); |
| | | |
| | | txn = null; |
| | | |
| | | // Take the lock. From now on, whatever error that happen in the life |
| | |
| | | return; |
| | | } |
| | | |
| | | // We can move close to the startCSN. |
| | | // Let's create a cursor from that point. |
| | | DatabaseEntry aKey = new DatabaseEntry(); |
| | | DatabaseEntry aData = new DatabaseEntry(); |
| | | if (localCursor.getPrev(aKey, aData, LockMode.DEFAULT) != SUCCESS) |
| | | if (positionStrategy == PositionStrategy.AFTER_MATCHING_KEY) |
| | | { |
| | | localCursor.close(); |
| | | localCursor = db.openCursor(txn, null); |
| | | // We can move close to the startCSN. |
| | | // Let's create a cursor from that point. |
| | | key.setData(null); |
| | | if (localCursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS) |
| | | { |
| | | localCursor.close(); |
| | | localCursor = db.openCursor(txn, null); |
| | | } |
| | | } |
| | | } |
| | | cursor = localCursor; |
| | | cursorHeld = cursor != null; |
| | | |
| | | if (key.getData() != null) |
| | | { |
| | | computeCurrentRecord(); |
| | | } |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | |
| | | return; |
| | | } |
| | | isClosed = true; |
| | | currentRecord = null; |
| | | } |
| | | |
| | | closeAndReleaseReadLock(cursor); |
| | |
| | | return null; |
| | | } |
| | | |
| | | currentRecord = null; |
| | | try |
| | | { |
| | | if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS) |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the next UpdateMsg from this cursor. |
| | | * |
| | | * @return the next UpdateMsg. |
| | | */ |
| | | UpdateMsg next() |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return null; |
| | | return false; |
| | | } |
| | | |
| | | UpdateMsg currentChange = null; |
| | | while (currentChange == null) |
| | | currentRecord = null; |
| | | while (currentRecord == null) |
| | | { |
| | | try |
| | | { |
| | | if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS) |
| | | { |
| | | return null; |
| | | return false; |
| | | } |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | return null; |
| | | throw new ChangelogException(e); |
| | | } |
| | | |
| | | CSN csn = null; |
| | | try |
| | | { |
| | | csn = toCSN(key.getData()); |
| | | if (isACounterRecord(csn)) |
| | | { |
| | | continue; |
| | | } |
| | | currentChange = (UpdateMsg) ReplicationMsg.generateMsg( |
| | | data.getData(), ProtocolVersion.getCurrentVersion()); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | /* |
| | | * An error happening trying to convert the data from the |
| | | * replicationServer database to an Update LocalizableMessage. This can only |
| | | * happen if the database is corrupted. There is not much more that we |
| | | * can do at this point except trying to continue with the next |
| | | * record. In such case, it is therefore possible that we miss some |
| | | * changes. |
| | | * TODO : This should be handled by the repair functionality. |
| | | */ |
| | | logger.error(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD, replicationServer.getServerId(), |
| | | csn, e.getMessage()); |
| | | } |
| | | computeCurrentRecord(); |
| | | } |
| | | return currentChange; |
| | | return currentRecord != null; |
| | | } |
| | | |
| | | private void computeCurrentRecord() |
| | | { |
| | | CSN csn = null; |
| | | try |
| | | { |
| | | csn = toCSN(key.getData()); |
| | | if (isACounterRecord(csn)) |
| | | { |
| | | return; |
| | | } |
| | | currentRecord = toRecord(data.getData()); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | /* |
| | | * An error happening trying to convert the data from the |
| | | * replicationServer database to an Update Message. This can only |
| | | * happen if the database is corrupted. There is not much more that we |
| | | * can do at this point except trying to continue with the next |
| | | * record. In such case, it is therefore possible that we miss some |
| | | * changes. |
| | | * TODO : This should be handled by the repair functionality. |
| | | */ |
| | | logger.error(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD, replicationServer.getServerId(), |
| | | csn, e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | private UpdateMsg toRecord(final byte[] data) throws Exception |
| | | { |
| | | final short currentVersion = ProtocolVersion.getCurrentVersion(); |
| | | return (UpdateMsg) ReplicationMsg.generateMsg(data, currentVersion); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public UpdateMsg getRecord() |
| | | { |
| | | return currentRecord; |
| | | } |
| | | |
| | | /** |