/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import java.io.Closeable; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.opends.messages.Message; import org.opends.messages.MessageBuilder; import org.opends.server.replication.common.CSN; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.ReplicationServerDomain; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.types.DN; import org.opends.server.util.StaticUtils; import com.sleepycat.je.*; import static com.sleepycat.je.LockMode.*; import static com.sleepycat.je.OperationStatus.*; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.util.StaticUtils.*; /** * This class implements the interface between the underlying database * and the JEReplicaDB class. *
* This is the only class that should have code using the BDB interfaces.
*/
public class ReplicationDB
{
private static final int START = 0;
private static final int STOP = 1;
private Database db;
private ReplicationDbEnv dbenv;
private ReplicationServer replicationServer;
private int serverId;
private DN baseDN;
/**
* The lock used to provide exclusive access to the thread that close the db
* (shutdown or clear).
*/
private final ReadWriteLock dbCloseLock = new ReentrantReadWriteLock(true);
// Change counter management
// The Db itself does not allow to count records between a start and an end
// change. And we cannot rely on the replication seqnum that is part of the
// CSN, since there can be holes (when an operation is canceled).
// And traversing all the records from the start one to the end one works
// fine but can be very long (ECL:lastChangeNumber).
//
// So we are storing special records in the DB (called counter records),
// that contain the number of changes since the previous counter record.
// One special record is :
// - a special key : changetime , serverid=0 seqnum=0
// - a counter value : count of changes since previous counter record.
//
// A counter record has to follow the order of the db, so it needs to have
// a CSN key that follows the order.
// A counter record must have its own CSN key since the Db does not support
// duplicate keys (it is a compatibility breaker character of the DB).
//
// We define 2 conditions to store a counter record :
// 1/- at least 'counterWindowSize' changes have been stored in the Db
// since the previous counter record
// 2/- the change to be stored has a new timestamp - so that the counter
// record is the first record for this timestamp.
/** Current value of the counter. */
private int counterCurrValue = 1;
/**
* When not null, the next change with a ts different from
* tsForNewCounterRecord will lead to store a new counterRecord.
*/
private long counterTsLimit = 0;
/**
* The counter record will never be written to the db more often than each
* counterWindowSize changes.
*/
private int counterWindowSize = 1000;
/**
* Creates a new database or open existing database that will be used
* to store and retrieve changes from an LDAP server.
* @param serverId The identifier of the LDAP server.
* @param baseDN The baseDN of the replication domain.
* @param replicationServer The ReplicationServer that needs to be shutdown.
* @param dbenv The Db environment to use to create the db.
* @throws ChangelogException If a database problem happened.
*/
public ReplicationDB(int serverId, DN baseDN,
ReplicationServer replicationServer, ReplicationDbEnv dbenv)
throws ChangelogException
{
this.serverId = serverId;
this.baseDN = baseDN;
this.dbenv = dbenv;
this.replicationServer = replicationServer;
// Get or create the associated ReplicationServerDomain and Db.
final ReplicationServerDomain domain =
replicationServer.getReplicationServerDomain(baseDN, true);
db = dbenv.getOrAddDb(serverId, baseDN, domain.getGenerationId());
intializeCounters();
}
private void intializeCounters() throws ChangelogException
{
this.counterCurrValue = 1;
Cursor cursor = null;
try
{
cursor = db.openCursor(null, null);
int distBackToCounterRecord = 0;
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry data = new DatabaseEntry();
OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT);
while (status == OperationStatus.SUCCESS)
{
CSN csn = toCSN(key.getData());
if (isACounterRecord(csn))
{
counterCurrValue = decodeCounterValue(data.getData()) + 1;
counterTsLimit = csn.getTime();
break;
}
status = cursor.getPrev(key, data, LockMode.DEFAULT);
distBackToCounterRecord++;
}
counterCurrValue += distBackToCounterRecord;
}
catch (DatabaseException e)
{
throw new ChangelogException(e);
}
finally
{
close(cursor);
}
}
private static CSN toCSN(byte[] data)
{
return new CSN(decodeUTF8(data));
}
/**
* add a list of changes to the underlying db.
*
* @param changes The list of changes to add to the underlying db.
*/
public void addEntries(List
* Will be let null for a read cursor
*
* Will be set non null for a write cursor
*/
private final Transaction txn;
private final Cursor cursor;
private final DatabaseEntry key;
private final DatabaseEntry data;
private boolean isClosed = false;
/**
* Creates a ReplServerDBCursor that can be used for browsing a
* replicationServer db.
*
* @param startCSN
* The CSN from which the cursor must start.
* @throws ChangelogException
* When the startCSN does not exist.
*/
private ReplServerDBCursor(CSN startCSN) throws ChangelogException
{
if (startCSN != null)
{
key = createReplicationKey(startCSN);
}
else
{
key = new DatabaseEntry();
}
data = new DatabaseEntry();
txn = null;
// Take the lock. From now on, whatever error that happen in the life
// of this cursor should end by unlocking that lock. We must also
// unlock it when throwing an exception.
dbCloseLock.readLock().lock();
boolean cursorHeld = false;
Cursor localCursor = null;
try
{
// If the DB has been closed then create empty cursor.
if (isDBClosed())
{
isClosed = true;
cursor = null;
return;
}
localCursor = db.openCursor(txn, null);
if (startCSN != null
&& localCursor.getSearchKey(key, data, LockMode.DEFAULT) != SUCCESS)
{
// We could not move the cursor to the expected startCSN
if (localCursor.getSearchKeyRange(key, data, DEFAULT) != SUCCESS)
{
// We could not even move the cursor close to it
// => return empty cursor
isClosed = true;
cursor = null;
return;
}
// We can move close to the startCSN.
// Let's create a cursor from that point.
DatabaseEntry aKey = new DatabaseEntry();
DatabaseEntry aData = new DatabaseEntry();
if (localCursor.getPrev(aKey, aData, LockMode.DEFAULT) != SUCCESS)
{
localCursor.close();
localCursor = db.openCursor(txn, null);
}
}
cursor = localCursor;
cursorHeld = cursor != null;
}
catch (DatabaseException e)
{
throw new ChangelogException(e);
}
finally
{
if (!cursorHeld)
{
closeAndReleaseReadLock(localCursor);
}
}
}
private ReplServerDBCursor() throws ChangelogException
{
key = new DatabaseEntry();
data = new DatabaseEntry();
// We'll go on only if no close or no clear is running
dbCloseLock.readLock().lock();
boolean cursorHeld = false;
Transaction localTxn = null;
Cursor localCursor = null;
try
{
// If the DB has been closed then create empty cursor.
if (isDBClosed())
{
isClosed = true;
txn = null;
cursor = null;
return;
}
// Create the transaction that will protect whatever done with this
// write cursor.
localTxn = dbenv.beginTransaction();
localCursor = db.openCursor(localTxn, null);
txn = localTxn;
cursor = localCursor;
cursorHeld = cursor != null;
}
catch (ChangelogException e)
{
abort(localTxn);
throw e;
}
catch (Exception e)
{
abort(localTxn);
throw new ChangelogException(e);
}
finally
{
if (!cursorHeld)
{
closeAndReleaseReadLock(localCursor);
}
}
}
private void abort(Transaction localTxn)
{
if (localTxn != null)
{
try
{
localTxn.abort();
}
catch (DatabaseException ignore)
{
// Ignore.
}
}
}
/**
* Close the ReplicationServer Cursor.
*/
@Override
public void close()
{
synchronized (this)
{
if (isClosed)
{
return;
}
isClosed = true;
}
closeAndReleaseReadLock(cursor);
if (txn != null)
{
try
{
// No need for durability when purging.
txn.commit(Durability.COMMIT_NO_SYNC);
}
catch (DatabaseException e)
{
dbenv.shutdownOnException(e);
}
}
}
/**
* Abort the Cursor after a Deadlock Exception.
* This method catch and ignore the DeadlockException because
* this must be done when aborting a cursor after a DeadlockException
* (per the Cursor documentation).
* This should not be used in any other case.
*/
public void abort()
{
synchronized (this)
{
if (isClosed)
{
return;
}
isClosed = true;
}
closeAndReleaseReadLock(cursor);
if (txn != null)
{
try
{
txn.abort();
}
catch (DatabaseException e)
{
dbenv.shutdownOnException(e);
}
}
}
/**
* Get the next CSN in the database from this Cursor.
*
* @return The next CSN in the database from this cursor.
* @throws ChangelogException
* In case of underlying database problem.
*/
public CSN nextCSN() throws ChangelogException
{
if (isClosed)
{
return null;
}
try
{
if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
{
return null;
}
return toCSN(key.getData());
}
catch (DatabaseException e)
{
throw new ChangelogException(e);
}
}
/**
* Get the next UpdateMsg from this cursor.
*
* @return the next UpdateMsg.
*/
public UpdateMsg next()
{
if (isClosed)
{
return null;
}
UpdateMsg currentChange = null;
while (currentChange == null)
{
try
{
if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
{
return null;
}
}
catch (DatabaseException e)
{
return null;
}
CSN csn = null;
try
{
csn = toCSN(key.getData());
if (isACounterRecord(csn))
{
continue;
}
currentChange = ReplicationData.generateChange(data.getData());
}
catch (Exception e)
{
/*
* An error happening trying to convert the data from the
* replicationServer database to an Update Message. This can only
* happen if the database is corrupted. There is not much more that we
* can do at this point except trying to continue with the next
* record. In such case, it is therefore possible that we miss some
* changes.
* TODO : This should be handled by the repair functionality.
*/
Message message = ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD
.get(replicationServer.getServerId(),
(csn == null ? "" : csn.toString()),
e.getMessage());
logError(message);
}
}
return currentChange;
}
/**
* Delete the record at the current cursor position.
*
* @throws ChangelogException In case of database problem.
*/
public void delete() throws ChangelogException
{
if (isClosed)
{
throw new IllegalStateException("ReplServerDBCursor already closed");
}
try
{
cursor.delete();
}
catch (DatabaseException e)
{
throw new ChangelogException(e);
}
}
}
/**
* Clears this change DB from the changes it contains.
*
* @throws ChangelogException In case of database problem.
*/
public void clear() throws ChangelogException
{
// The coming users will be blocked until the clear is done
dbCloseLock.writeLock().lock();
try
{
// If the DB has been closed then return immediately.
if (isDBClosed())
{
return;
}
// Clears the reference to this serverID
dbenv.clearServerId(baseDN, serverId);
final Database oldDb = db;
db = null; // In case there's a failure between here and recreation.
dbenv.clearDb(oldDb);
// RE-create the db
db = dbenv.getOrAddDb(serverId, baseDN, -1);
}
catch (Exception e)
{
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_ERROR_CLEARING_DB.get(toString(),
e.getMessage() + " " + stackTraceToSingleLineString(e)));
logError(mb.toMessage());
}
finally
{
// Relax the waiting users
dbCloseLock.writeLock().unlock();
}
}
/**
* Count the number of changes between 2 changes numbers (inclusive).
* @param start The lower limit of the count.
* @param stop The higher limit of the count.
* @return The number of changes between provided start and stop CSN.
* Returns 0 when an error occurs.
*/
public long count(CSN start, CSN stop)
{
dbCloseLock.readLock().lock();
try
{
// If the DB has been closed then return immediately.
if (isDBClosed())
{
return 0;
}
if (start == null && stop == null)
{
return db.count();
}
int[] counterValues = new int[2];
int[] distanceToCounterRecords = new int[2];
// Step 1 : from the start point, traverse db to the next counter record
// or to the stop point.
findFirstCounterRecordAfterStartPoint(start, stop, counterValues,
distanceToCounterRecords);
// cases
if (counterValues[START] == 0)
return distanceToCounterRecords[START];
// Step 2 : from the stop point, traverse db to the next counter record
// or to the start point.
if (!findFirstCounterRecordBeforeStopPoint(start, stop, counterValues,
distanceToCounterRecords))
{
// database is empty
return 0;
}
// Step 3 : Now consolidates the result
return computeDistance(counterValues, distanceToCounterRecords);
}
catch (DatabaseException e)
{
dbenv.shutdownOnException(e);
}
finally
{
dbCloseLock.readLock().unlock();
}
return 0;
}
private void findFirstCounterRecordAfterStartPoint(CSN start, CSN stop,
int[] counterValues, int[] distanceToCounterRecords)
throws DatabaseException
{
Cursor cursor = db.openCursor(null, null);
try
{
OperationStatus status;
DatabaseEntry key;
DatabaseEntry data = new DatabaseEntry();
if (start != null)
{
key = createReplicationKey(start);
status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
if (status == OperationStatus.NOTFOUND)
status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
}
else
{
key = new DatabaseEntry();
// JNR: I suspect this is equivalent to writing cursor.getFirst().
// If it is, then please change the code to make it clearer.
status = cursor.getNext(key, data, LockMode.DEFAULT);
}
while (status == OperationStatus.SUCCESS)
{
// test whether the record is a regular change or a counter
final CSN csn = toCSN(key.getData());
if (isACounterRecord(csn))
{
// we have found the counter record
counterValues[START] = decodeCounterValue(data.getData());
break;
}
// it is a regular change record
if (csn.isNewerThan(stop))
{ // we are outside the range: we reached the 'stop' target
break;
}
distanceToCounterRecords[START]++;
status = cursor.getNext(key, data, LockMode.DEFAULT);
// loop to update the distance and possibly find a counter record
}
}
finally
{
close(cursor);
}
}
private boolean findFirstCounterRecordBeforeStopPoint(CSN start, CSN stop,
int[] counterValues, int[] distanceToCounterRecords)
throws DatabaseException
{
Cursor cursor = db.openCursor(null, null);
try
{
DatabaseEntry key = createReplicationKey(stop);
DatabaseEntry data = new DatabaseEntry();
OperationStatus status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
if (status != OperationStatus.SUCCESS)
{
key = new DatabaseEntry();
data = new DatabaseEntry();
status = cursor.getLast(key, data, LockMode.DEFAULT);
if (status != OperationStatus.SUCCESS)
{
return false;
}
}
while (status == OperationStatus.SUCCESS)
{
final CSN csn = toCSN(key.getData());
if (isACounterRecord(csn))
{
// we have found the counter record
counterValues[STOP] = decodeCounterValue(data.getData());
break;
}
// it is a regular change record
if (csn.isOlderThan(start))
{ // we are outside the range: we reached the 'start' target
break;
}
distanceToCounterRecords[STOP]++;
status = cursor.getPrev(key, data, LockMode.DEFAULT);
// loop to update the distance and possibly find a counter record
}
return true;
}
finally
{
close(cursor);
}
}
/**
* The diagram below shows a visual description of how the distance between
* two CSNs in the database is computed.
*
*
* +--------+ +--------+
* | CASE 1 | | CASE 2 |
* +--------+ +--------+
*
* CSN CSN
* ----- -----
* START => ----- START => -----
* ^ ----- ^ -----
* | ----- | -----
* dist 1 ----- dist 1 -----
* | ----- | -----
* v ----- v -----
* CR 1&2 => [1000] CR 1 => [1000]
* ^ ----- -----
* | ----- -----
* dist 2 ----- -----
* | ----- -----
* v ----- -----
* STOP => ----- -----
* ----- -----
* CR => [2000] CR 2 => [2000]
* ----- ^ -----
* | -----
* dist 2 -----
* | -----
* v -----
* STOP => -----
*
*
* Explanation of the terms used:
*
*
*/
private long computeDistance(int[] counterValues,
int[] distanceToCounterRecords)
{
if (counterValues[START] != 0)
{
if (counterValues[START] == counterValues[STOP])
{
// only one counter record between from and to - no need to use it
return distanceToCounterRecords[START] + distanceToCounterRecords[STOP];
}
// at least 2 counter records between from and to
return distanceToCounterRecords[START]
+ (counterValues[STOP] - counterValues[START])
+ distanceToCounterRecords[STOP];
}
return 0;
}
/**
* Whether a provided CSN represents a counter record. A counter record is
* used to store the time.
*
* @param csn
* The CSN to test
* @return true if the provided CSN is a counter record, false if the change
* is a regular/normal change that was performed on the replica.
*/
private static boolean isACounterRecord(CSN csn)
{
return csn.getServerId() == 0 && csn.getSeqnum() == 0;
}
private static CSN newCounterRecord(CSN csn)
{
return new CSN(csn.getTime(), 0, 0);
}
/**
* Decode the provided database entry as a the value of a counter.
* @param entry The provided entry.
* @return The counter value.
*/
private static int decodeCounterValue(byte[] entry)
{
String numAckStr = decodeUTF8(entry);
return Integer.parseInt(numAckStr);
}
/**
* Encode the provided counter value in a database entry.
* @return The database entry with the counter value encoded inside.
*/
static private DatabaseEntry encodeCounterValue(int value)
{
DatabaseEntry entry = new DatabaseEntry();
entry.setData(getBytes(String.valueOf(value)));
return entry;
}
/**
* Set the counter writing window size (public method for unit tests only).
* @param size Size in number of record.
*/
public void setCounterRecordWindowSize(int size)
{
this.counterWindowSize = size;
}
/**
* Returns {@code true} if the DB is closed. This method assumes that either
* the db read/write lock has been taken.
*/
private boolean isDBClosed()
{
return db == null || !db.getEnvironment().isValid();
}
}