/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; import org.opends.messages.MessageBuilder; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.util.List; import java.io.UnsupportedEncodingException; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.protocol.UpdateMsg; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.zip.DataFormatException; import com.sleepycat.je.Cursor; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Database; import com.sleepycat.je.LockConflictException; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; import com.sleepycat.je.Transaction; /** * This class implements the interface between the underlying database * and the dbHandler class. * This is the only class that should have code using the BDB interfaces. */ public class ReplicationDB { private Database db = null; private ReplicationDbEnv dbenv = null; private ReplicationServer replicationServer; private int serverId; private String baseDn; // The maximum number of retries in case of DatabaseDeadlock Exception. private static final int DEADLOCK_RETRIES = 10; // The lock used to provide exclusive access to the thread that // close the db (shutdown or clear). private ReentrantReadWriteLock dbCloseLock; // Change counter management // The Db itself does not allow to count records between a start and an end // change. And we cannot rely on the replication seqnum that is part of the // changenumber, since there can be holes (when an operation is canceled). // And traversing all the records from the start one to the end one works // fine but can be very long (ECL:lastChangeNumber). // // So we are storing special records in the DB (called counter records), // that contain the number of changes since the previous counter record. // One special record is : // - a special key : changetime , serverid=0 seqnum=0 // - a counter value : count of changes since previous counter record. // // A counter record has to follow the order of the db, so it needs to have // a changenumber key that follow the order. // A counter record must have its own chagenumber key since the Db does not // support duplicate key (it is a compatibility breaker character of the DB). // // We define 2 conditions to store a counter record : // 1/- at least 'counterWindowSize' changes have been stored in the Db // since the previous counter record // 2/- the change to be stored has a new timestamp - so that the counter // record is the first record for this timestamp. // private int counterCurrValue = 1; // Current value of the counter. private long counterTsLimit = 0; // When not null, // the next change with a ts different from tsForNewCounterRecord will lead // to store a new counterRecord. private int counterWindowSize = 1000; // The counter record will never be written to the db more often than each // counterWindowSize changes. /** * Creates a new database or open existing database that will be used * to store and retrieve changes from an LDAP server. * @param serverId The identifier of the LDAP server. * @param baseDn The baseDn of the replication domain. * @param replicationServer The ReplicationServer that needs to be shutdown. * @param dbenv The Db environment to use to create the db. * @throws DatabaseException If a database problem happened. */ public ReplicationDB(int serverId, String baseDn, ReplicationServer replicationServer, ReplicationDbEnv dbenv) throws DatabaseException { this.serverId = serverId; this.baseDn = baseDn; this.dbenv = dbenv; this.replicationServer = replicationServer; // Get or create the associated ReplicationServerDomain and Db. db = dbenv.getOrAddDb(serverId, baseDn, replicationServer.getReplicationServerDomain(baseDn, true).getGenerationId()); dbCloseLock = new ReentrantReadWriteLock(true); // Cursor cursor = null; Transaction txn = null; DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); OperationStatus status; int distBackToCounterRecord = 0; // Initialize counter this.counterCurrValue = 1; cursor = db.openCursor(txn, null); status = cursor.getLast(key, data, LockMode.DEFAULT); while (status == OperationStatus.SUCCESS) { try { ChangeNumber cn =new ChangeNumber(new String(key.getData(), "UTF-8")); if (!ReplicationDB.isaCounter(cn)) { status = cursor.getPrev(key, data, LockMode.DEFAULT); distBackToCounterRecord++; } else { // counter record counterCurrValue = decodeCounterValue(data.getData())+1; counterTsLimit = cn.getTime(); break; } } catch (UnsupportedEncodingException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); if (txn != null) { try { txn.abort(); } catch (DatabaseException e1) { // can't do much more. The ReplicationServer is shuting down. } } replicationServer.shutdown(); } catch (DataFormatException e) { // Should never happen } } counterCurrValue += distBackToCounterRecord; cursor.close(); } /** * add a list of changes to the underlying db. * * @param changes The list of changes to add to the underlying db. */ public void addEntries(List changes) { Transaction txn = null; try { int tries = 0; boolean done = false; // The database can return a Deadlock Exception if several threads are // accessing the database at the same time. This Exception is a // transient state, when it happens the transaction is aborted and // the operation is attempted again up to DEADLOCK_RETRIES times. while ((tries++ < DEADLOCK_RETRIES) && (!done)) { dbCloseLock.readLock().lock(); try { txn = dbenv.beginTransaction(); for (UpdateMsg change : changes) { DatabaseEntry key = new ReplicationKey(change.getChangeNumber()); DatabaseEntry data = new ReplicationData(change); if ((counterCurrValue!=0) && (counterCurrValue%counterWindowSize == 0)) { // enough changes to generate a counter record - wait for the next // change fo time counterTsLimit = change.getChangeNumber().getTime(); } if ((counterTsLimit!=0) && (change.getChangeNumber().getTime() != counterTsLimit)) { // Write the counter record DatabaseEntry counterKey = new ReplicationKey( new ChangeNumber( change.getChangeNumber().getTime(), 0, 0)); DatabaseEntry counterValue = encodeCounterValue(counterCurrValue-1); db.put(txn, counterKey, counterValue); counterTsLimit=0; } db.put(txn, key, data); counterCurrValue++; } txn.commitWriteNoSync(); txn = null; done = true; } catch (LockConflictException e) { if (txn != null) txn.abort(); txn = null; } finally { dbCloseLock.readLock().unlock(); } } if (!done) { // Could not write to the DB after DEADLOCK_RETRIES tries. // This ReplicationServer is not reliable and will be shutdown. MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); logError(mb.toMessage()); if (txn != null) { txn.abort(); } replicationServer.shutdown(); } } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); if (txn != null) { try { txn.abort(); } catch (DatabaseException e1) { // can't do much more. The ReplicationServer is shuting down. } } replicationServer.shutdown(); } catch (UnsupportedEncodingException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); if (txn != null) { try { txn.abort(); } catch (DatabaseException e1) { // can't do much more. The ReplicationServer is shuting down. } } replicationServer.shutdown(); } } /** * Shutdown the database. */ public void shutdown() { try { dbCloseLock.writeLock().lock(); try { db.close(); } finally { dbCloseLock.writeLock().unlock(); } } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(NOTE_EXCEPTION_CLOSING_DATABASE.get(this.toString())); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); } } /** * Create a cursor that can be used to search or iterate on this * ReplicationServer DB. * * @param changeNumber The ChangeNumber from which the cursor must start. * @throws DatabaseException If a database error prevented the cursor * creation. * @throws Exception if the ReplServerDBCursor creation failed. * @return The ReplServerDBCursor. */ public ReplServerDBCursor openReadCursor(ChangeNumber changeNumber) throws DatabaseException, Exception { return new ReplServerDBCursor(changeNumber); } /** * Create a cursor that can be used to delete some record from this * ReplicationServer database. * * @throws DatabaseException If a database error prevented the cursor * creation. * @throws Exception if the ReplServerDBCursor creation failed. * * @return The ReplServerDBCursor. */ public ReplServerDBCursor openDeleteCursor() throws DatabaseException, Exception { return new ReplServerDBCursor(); } private void closeLockedCursor(Cursor cursor) throws DatabaseException { try { if (cursor != null) cursor.close(); } finally { dbCloseLock.readLock().unlock(); } } /** * Read the first Change from the database. * @return the first ChangeNumber. */ public ChangeNumber readFirstChange() { Cursor cursor = null; String str = null; ChangeNumber cn = null; try { dbCloseLock.readLock().lock(); cursor = db.openCursor(null, null); } catch (DatabaseException e1) { dbCloseLock.readLock().unlock(); return null; } try { try { DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { /* database is empty */ return null; } try { str = new String(key.getData(), "UTF-8"); cn = new ChangeNumber(str); if (ReplicationDB.isaCounter(cn)) { // First record is a counter record .. go next status = cursor.getNext(key, data, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { // DB contains only a counter record return null; } else { cn = new ChangeNumber(new String(key.getData(), "UTF-8")); } } } catch (UnsupportedEncodingException e) { // never happens } } finally { closeLockedCursor(cursor); } } catch (DatabaseException e) { /* database is faulty */ MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); cn = null; } return cn; } /** * Read the last Change from the database. * @return the last ChangeNumber. */ public ChangeNumber readLastChange() { Cursor cursor = null; ChangeNumber cn = null; try { dbCloseLock.readLock().lock(); try { cursor = db.openCursor(null, null); DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { /* database is empty */ return null; } try { String str = new String(key.getData(), "UTF-8"); cn = new ChangeNumber(str); if (ReplicationDB.isaCounter(cn)) { if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { /* database only contain a counter record - don't know * how much it can be possible but ... */ cn = null; } } } catch (UnsupportedEncodingException e) { // never happens } } finally { closeLockedCursor(cursor); } } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); cn = null; } return cn; } /** * {@inheritDoc} */ @Override public String toString() { return serverId + baseDn.toString(); } /** * This Class implements a cursor that can be used to browse a * replicationServer database. */ public class ReplServerDBCursor { private Cursor cursor = null; // The transaction that will protect the actions done with the cursor // Will be let null for a read cursor // Will be set non null for a write cursor private Transaction txn = null; DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); /** * Creates a ReplServerDBCursor that can be used for browsing a * replicationServer db. * * @param startingChangeNumber The ChangeNumber from which the cursor must * start. * @throws Exception When the startingChangeNumber does not exist. */ private ReplServerDBCursor(ChangeNumber startingChangeNumber) throws Exception { try { // Take the lock. From now on, whatever error that happen in the life // of this cursor should end by unlocking that lock. We must also // unlock it when throwing an exception. dbCloseLock.readLock().lock(); cursor = db.openCursor(txn, null); if (startingChangeNumber != null) { key = new ReplicationKey(startingChangeNumber); data = new DatabaseEntry(); if (cursor.getSearchKey(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { // We could not move the cursor to the expected startingChangeNumber if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { // We could not even move the cursor closed to it => failure throw new Exception("ChangeNumber not available"); } else { // We can move close to the startingChangeNumber. // Let's create a cursor from that point. DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { closeLockedCursor(cursor); dbCloseLock.readLock().lock(); cursor = db.openCursor(txn, null); } } } } } catch (Exception e) { // Unlocking is required before throwing any exception closeLockedCursor(cursor); throw (e); } } private ReplServerDBCursor() throws DatabaseException { try { // We'll go on only if no close or no clear is running dbCloseLock.readLock().lock(); // Create the transaction that will protect whatever done with this // write cursor. txn = dbenv.beginTransaction(); cursor = db.openCursor(txn, null); } catch(DatabaseException e) { if (txn != null) { try { txn.abort(); } catch (DatabaseException dbe) {} } closeLockedCursor(cursor); throw (e); } } /** * Close the ReplicationServer Cursor. */ public void close() { try { closeLockedCursor(cursor); cursor = null; } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); } if (txn != null) { try { txn.commit(); } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); } } } /** * Abort the Cursor after a Deadlock Exception. * This method catch and ignore the DeadlockException because * this must be done when aborting a cursor after a DeadlockException * (per the Cursor documentation). * This should not be used in any other case. */ public void abort() { if (cursor == null) return; try { closeLockedCursor(cursor); cursor = null; } catch (LockConflictException e1) { // The DB documentation states that a DeadlockException // on the close method of a cursor that is aborting should // be ignored. } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); } if (txn != null) { try { txn.abort(); } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); } } } /** * Get the next ChangeNumber in the database from this Cursor. * * @return The next ChangeNumber in the database from this cursor. * @throws DatabaseException In case of underlying database problem. */ public ChangeNumber nextChangeNumber() throws DatabaseException { OperationStatus status = cursor.getNext(key, data, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { return null; } try { String csnString = new String(key.getData(), "UTF-8"); return new ChangeNumber(csnString); } catch (UnsupportedEncodingException e) { // can't happen return null; } } /** * Get the next UpdateMsg from this cursor. * * @return the next UpdateMsg. */ public UpdateMsg next() { UpdateMsg currentChange = null; while (currentChange == null) { try { OperationStatus status = cursor.getNext(key, data, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { return null; } } catch (DatabaseException e) { return null; } try { ChangeNumber cn=new ChangeNumber(new String(key.getData(), "UTF-8")); if(ReplicationDB.isaCounter(cn)) { // counter record continue; } currentChange = ReplicationData.generateChange(data.getData()); } catch (Exception e) { /* * An error happening trying to convert the data from the * replicationServer database to an Update Message. * This can only happen if the database is corrupted. * There is not much more that we can do at this point except trying * to continue with the next record. * In such case, it is therefore possible that we miss some changes. * TODO. log an error message. * TODO : REPAIR : Such problem should be handled by the * repair functionality. */ } } return currentChange; } /** * Delete the record at the current cursor position. * * @throws DatabaseException In case of database problem. */ public void delete() throws DatabaseException { cursor.delete(); } } // ReplServerDBCursor /** * Clears this change DB from the changes it contains. * * @throws Exception Throws an exception it occurs. * @throws DatabaseException Throws a DatabaseException when it occurs. */ public void clear() throws Exception, DatabaseException { // The coming users will be blocked until the clear is done dbCloseLock.writeLock().lock(); try { String dbName = db.getDatabaseName(); // Clears the reference to this serverID dbenv.clearServerId(baseDn, serverId); // Closing is requested by the Berkeley DB before truncate db.close(); // Clears the changes dbenv.clearDb(dbName); // RE-create the db db = dbenv.getOrAddDb(serverId, baseDn, (long)-1); } catch(Exception e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_ERROR_CLEARING_DB.get(this.toString(), e.getMessage() + " " + stackTraceToSingleLineString(e))); logError(mb.toMessage()); } finally { // Relax the waiting users dbCloseLock.writeLock().unlock(); } } /** * Count the number of changes between 2 changes numbers (inclusive). * @param start The lower limit of the count. * @param stop The higher limit of the count. * @return The number of changes between provided start and stop changeNumber. * Returns -1 when an error occurs. */ public int count(ChangeNumber start, ChangeNumber stop) { int counterRecord1 = 0; int counterRecord2 = 0; int distToCounterRecord1 = 0; int distBackToCounterRecord2 = 0; int count=0; Cursor cursor = null; Transaction txn = null; OperationStatus status; try { ChangeNumber cn ; if ((start==null)&&(stop==null)) return (int)db.count(); // Step 1 : from the start point, traverse db to the next counter record // or to the stop point. DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); cursor = db.openCursor(txn, null); if (start != null) { key = new ReplicationKey(start); status = cursor.getSearchKey(key, data, LockMode.DEFAULT); if (status == OperationStatus.NOTFOUND) status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT); } else { status = cursor.getNext(key, data, LockMode.DEFAULT); } while (status == OperationStatus.SUCCESS) { // test whether the record is a regular change or a counter String csnString = new String(key.getData(), "UTF-8"); cn = new ChangeNumber(csnString); if (cn.getServerId() != 0) { // reached a regular change record // test whether we reached the 'stop' target if (!cn.newer(stop)) { // let's loop distToCounterRecord1++; status = cursor.getNext(key, data, LockMode.DEFAULT); } else { // reached the end break; } } else { // counter record counterRecord1 = decodeCounterValue(data.getData()); break; } } cursor.close(); // cases // if (counterRecord1==0) return distToCounterRecord1; // Step 2 : from the stop point, traverse db to the next counter record // or to the start point. txn = null; data = new DatabaseEntry(); key = new ReplicationKey(stop); cursor = db.openCursor(txn, null); status = cursor.getSearchKey(key, data, LockMode.DEFAULT); if (status == OperationStatus.SUCCESS) { cn = new ChangeNumber(new String(key.getData(), "UTF-8")); } else { key = new DatabaseEntry(); data = new DatabaseEntry(); status = cursor.getLast(key, data, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { /* database is empty */ return 0; } } while (status == OperationStatus.SUCCESS) { cn = new ChangeNumber(new String(key.getData(), "UTF-8")); if (!ReplicationDB.isaCounter(cn)) { // regular change record if (!cn.older(start)) { distBackToCounterRecord2++; status = cursor.getPrev(key, data, LockMode.DEFAULT); } else break; } else { // counter record counterRecord2 = decodeCounterValue(data.getData()); break; } } cursor.close(); // Step 3 : Now consolidates the result if (counterRecord1!=0) { if (counterRecord1 == counterRecord2) { // only one cp between from and to - no need to use it count = distToCounterRecord1 + distBackToCounterRecord2; } else { // 2 cp between from and to count = distToCounterRecord1 + (counterRecord2-counterRecord1) + distBackToCounterRecord2; } } } catch (UnsupportedEncodingException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); } catch (DataFormatException e) { // Should never happen } finally { if (cursor != null) cursor.close(); if (txn != null) { try { txn.abort(); } catch (DatabaseException e1) { // can't do much more. The ReplicationServer is shuting down. } } } return count; } /** * Test if a provided changeNumber represents a counter record. * @param cn The provided changeNumber. * @return True if the provided changenumber is a counter. */ static private boolean isaCounter(ChangeNumber cn) { return ((cn.getServerId()== 0) && (cn.getSeqnum()==0)); } /** * Decode the provided database entry as a the value of a counter. * @param entry The provided entry. * @return The counter value. * @throws DataFormatException */ private static int decodeCounterValue(byte[] entry) throws DataFormatException { try { String numAckStr = new String(entry, 0, entry.length, "UTF-8"); return Integer.parseInt(numAckStr); } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } } /** * Encode the provided counter value in a database entry. * @param entry The provided entry. * @return The databse entry with the counter value encoded inside.. * @throws UnsupportedEncodingException */ static private DatabaseEntry encodeCounterValue(int value) throws UnsupportedEncodingException { DatabaseEntry entry = new DatabaseEntry(); entry.setData(String.valueOf(value).getBytes("UTF-8")); return entry; } /** * Set the counter writing window size (public method for unit tests only). * @param size Size in number of record. */ public void setCounterWindowSize(int size) { this.counterWindowSize = size; } }