| | |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.util.StaticUtils.decodeUTF8; |
| | | import static org.opends.server.util.StaticUtils.getBytes; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import java.util.List; |
| | |
| | | |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | |
| | | import java.util.concurrent.locks.ReentrantReadWriteLock; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import com.sleepycat.je.Cursor; |
| | | import com.sleepycat.je.DatabaseEntry; |
| | |
| | | // Initialize counter |
| | | this.counterCurrValue = 1; |
| | | cursor = db.openCursor(txn, null); |
| | | status = cursor.getLast(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | try |
| | | { |
| | | try |
| | | status = cursor.getLast(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | ChangeNumber cn =new ChangeNumber(new String(key.getData(), "UTF-8")); |
| | | ChangeNumber cn = new ChangeNumber(decodeUTF8(key.getData())); |
| | | if (!ReplicationDB.isaCounter(cn)) |
| | | { |
| | | status = cursor.getPrev(key, data, LockMode.DEFAULT); |
| | |
| | | else |
| | | { |
| | | // counter record |
| | | counterCurrValue = decodeCounterValue(data.getData())+1; |
| | | counterCurrValue = decodeCounterValue(data.getData()) + 1; |
| | | counterTsLimit = cn.getTime(); |
| | | break; |
| | | } |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | replicationServer.shutdown(); |
| | | if (txn != null) |
| | | { |
| | | try |
| | | { |
| | | txn.abort(); |
| | | } catch (DatabaseException e1) |
| | | { |
| | | // can't do much more. The ReplicationServer is shuting down. |
| | | } |
| | | } |
| | | replicationServer.shutdown(); |
| | | } |
| | | catch (DataFormatException e) |
| | | { |
| | | // Should never happen |
| | | } |
| | | counterCurrValue += distBackToCounterRecord; |
| | | } |
| | | counterCurrValue += distBackToCounterRecord; |
| | | cursor.close(); |
| | | |
| | | finally |
| | | { |
| | | cursor.close(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | String str = null; |
| | | ChangeNumber cn = null; |
| | | |
| | | dbCloseLock.readLock().lock(); |
| | | try |
| | | { |
| | | dbCloseLock.readLock().lock(); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | |
| | | cursor = db.openCursor(null, null); |
| | | } |
| | | catch (DatabaseException e1) |
| | | { |
| | | dbCloseLock.readLock().unlock(); |
| | | return null; |
| | | } |
| | | try |
| | | { |
| | | try |
| | | |
| | | OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); |
| | | |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); |
| | | /* database is empty */ |
| | | return null; |
| | | } |
| | | |
| | | str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | { |
| | | // First record is a counter record .. go next |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | /* database is empty */ |
| | | // DB contains only a counter record |
| | | return null; |
| | | } |
| | | try |
| | | else |
| | | { |
| | | str = new String(key.getData(), "UTF-8"); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | { |
| | | // First record is a counter record .. go next |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | // DB contains only a counter record |
| | | return null; |
| | | } |
| | | else |
| | | { |
| | | cn = new ChangeNumber(new String(key.getData(), "UTF-8")); |
| | | } |
| | | } |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // never happens |
| | | cn = new ChangeNumber(decodeUTF8(key.getData())); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | closeLockedCursor(cursor); |
| | | } |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | |
| | | replicationServer.shutdown(); |
| | | cn = null; |
| | | } |
| | | finally |
| | | { |
| | | closeLockedCursor(cursor); |
| | | } |
| | | return cn; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Read the last Change from the database. |
| | | * |
| | | * @return the last ChangeNumber. |
| | | */ |
| | | public ChangeNumber readLastChange() |
| | |
| | | Cursor cursor = null; |
| | | ChangeNumber cn = null; |
| | | |
| | | dbCloseLock.readLock().lock(); |
| | | try |
| | | { |
| | | dbCloseLock.readLock().lock(); |
| | | try |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | |
| | | cursor = db.openCursor(null, null); |
| | | |
| | | OperationStatus status = cursor.getLast(key, data, |
| | | LockMode.DEFAULT); |
| | | |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | cursor = db.openCursor(null, null); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | /* database is empty */ |
| | | return null; |
| | | } |
| | | try |
| | | { |
| | | String str = new String(key.getData(), "UTF-8"); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | { |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) != |
| | | OperationStatus.SUCCESS) |
| | | { |
| | | /* database only contain a counter record - don't know |
| | | * how much it can be possible but ... */ |
| | | cn = null; |
| | | } |
| | | } |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // never happens |
| | | } |
| | | /* database is empty */ |
| | | return null; |
| | | } |
| | | finally |
| | | |
| | | String str = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | { |
| | | closeLockedCursor(cursor); |
| | | if (cursor.getPrev(key, data, |
| | | LockMode.DEFAULT) != OperationStatus.SUCCESS) |
| | | { |
| | | /* |
| | | * database only contain a counter record - don't know how much it can |
| | | * be possible but ... |
| | | */ |
| | | cn = null; |
| | | } |
| | | } |
| | | } |
| | | catch (DatabaseException e) |
| | |
| | | replicationServer.shutdown(); |
| | | cn = null; |
| | | } |
| | | finally |
| | | { |
| | | closeLockedCursor(cursor); |
| | | } |
| | | |
| | | return cn; |
| | | } |
| | | |
| | |
| | | */ |
| | | public class ReplServerDBCursor |
| | | { |
| | | private Cursor cursor = null; |
| | | |
| | | // The transaction that will protect the actions done with the cursor |
| | | // The transaction that will protect the actions done with the cursor |
| | | // Will be let null for a read cursor |
| | | // Will be set non null for a write cursor |
| | | private Transaction txn = null; |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | private final Transaction txn; |
| | | private final Cursor cursor; |
| | | private final DatabaseEntry key; |
| | | private final DatabaseEntry data; |
| | | |
| | | private boolean isClosed = false; |
| | | |
| | | /** |
| | | * Creates a ReplServerDBCursor that can be used for browsing a |
| | | * replicationServer db. |
| | | * |
| | | * @param startingChangeNumber The ChangeNumber from which the cursor must |
| | | * start. |
| | | * @throws Exception When the startingChangeNumber does not exist. |
| | | * @param startingChangeNumber |
| | | * The ChangeNumber from which the cursor must start. |
| | | * @throws Exception |
| | | * When the startingChangeNumber does not exist. |
| | | */ |
| | | private ReplServerDBCursor(ChangeNumber startingChangeNumber) |
| | | throws Exception |
| | | throws Exception |
| | | { |
| | | if (startingChangeNumber != null) |
| | | { |
| | | key = new ReplicationKey(startingChangeNumber); |
| | | } |
| | | else |
| | | { |
| | | key = new DatabaseEntry(); |
| | | } |
| | | data = new DatabaseEntry(); |
| | | |
| | | txn = null; |
| | | |
| | | // Take the lock. From now on, whatever error that happen in the life |
| | | // of this cursor should end by unlocking that lock. We must also |
| | | // unlock it when throwing an exception. |
| | | dbCloseLock.readLock().lock(); |
| | | |
| | | Cursor localCursor = null; |
| | | try |
| | | { |
| | | // Take the lock. From now on, whatever error that happen in the life |
| | | // of this cursor should end by unlocking that lock. We must also |
| | | // unlock it when throwing an exception. |
| | | dbCloseLock.readLock().lock(); |
| | | |
| | | cursor = db.openCursor(txn, null); |
| | | localCursor = db.openCursor(txn, null); |
| | | if (startingChangeNumber != null) |
| | | { |
| | | key = new ReplicationKey(startingChangeNumber); |
| | | data = new DatabaseEntry(); |
| | | |
| | | if (cursor.getSearchKey(key, data, LockMode.DEFAULT) != |
| | | if (localCursor.getSearchKey(key, data, LockMode.DEFAULT) != |
| | | OperationStatus.SUCCESS) |
| | | { |
| | | // We could not move the cursor to the expected startingChangeNumber |
| | | if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) != |
| | | if (localCursor.getSearchKeyRange(key, data, LockMode.DEFAULT) != |
| | | OperationStatus.SUCCESS) |
| | | { |
| | | // We could not even move the cursor closed to it => failure |
| | |
| | | // Let's create a cursor from that point. |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) != |
| | | if (localCursor.getPrev(key, data, LockMode.DEFAULT) != |
| | | OperationStatus.SUCCESS) |
| | | { |
| | | closeLockedCursor(cursor); |
| | | dbCloseLock.readLock().lock(); |
| | | cursor = db.openCursor(txn, null); |
| | | localCursor.close(); |
| | | localCursor = db.openCursor(txn, null); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | cursor = localCursor; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // Unlocking is required before throwing any exception |
| | | closeLockedCursor(cursor); |
| | | throw (e); |
| | | // Unlocking is required before throwing any exception |
| | | try |
| | | { |
| | | closeLockedCursor(localCursor); |
| | | } |
| | | catch (Exception ignore) |
| | | { |
| | | // Ignore. |
| | | } |
| | | throw e; |
| | | } |
| | | } |
| | | |
| | | private ReplServerDBCursor() throws DatabaseException |
| | | private ReplServerDBCursor() throws Exception |
| | | { |
| | | key = new DatabaseEntry(); |
| | | data = new DatabaseEntry(); |
| | | |
| | | // We'll go on only if no close or no clear is running |
| | | dbCloseLock.readLock().lock(); |
| | | |
| | | Transaction localTxn = null; |
| | | Cursor localCursor = null; |
| | | try |
| | | { |
| | | // We'll go on only if no close or no clear is running |
| | | dbCloseLock.readLock().lock(); |
| | | |
| | | // Create the transaction that will protect whatever done with this |
| | | // write cursor. |
| | | txn = dbenv.beginTransaction(); |
| | | localTxn = dbenv.beginTransaction(); |
| | | localCursor = db.openCursor(localTxn, null); |
| | | |
| | | cursor = db.openCursor(txn, null); |
| | | txn = localTxn; |
| | | cursor = localCursor; |
| | | } |
| | | catch(DatabaseException e) |
| | | catch (Exception e) |
| | | { |
| | | if (txn != null) |
| | | try |
| | | { |
| | | closeLockedCursor(localCursor); |
| | | } |
| | | catch (Exception ignore) |
| | | { |
| | | // Ignore. |
| | | } |
| | | |
| | | if (localTxn != null) |
| | | { |
| | | try |
| | | { |
| | | txn.abort(); |
| | | localTxn.abort(); |
| | | } |
| | | catch (DatabaseException dbe) |
| | | {} |
| | | catch (DatabaseException ignore) |
| | | { |
| | | // Ignore. |
| | | } |
| | | } |
| | | closeLockedCursor(cursor); |
| | | throw (e); |
| | | throw e; |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public void close() |
| | | { |
| | | synchronized (this) |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return; |
| | | } |
| | | isClosed = true; |
| | | } |
| | | |
| | | boolean closeHasFailed = false; |
| | | |
| | | try |
| | | { |
| | | closeLockedCursor(cursor); |
| | | cursor = null; |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | |
| | | mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | replicationServer.shutdown(); |
| | | closeHasFailed = true; |
| | | } |
| | | |
| | | if (txn != null) |
| | | { |
| | | try |
| | | { |
| | | txn.commit(); |
| | | } catch (DatabaseException e) |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | replicationServer.shutdown(); |
| | | closeHasFailed = true; |
| | | } |
| | | } |
| | | |
| | | if (closeHasFailed) |
| | | { |
| | | replicationServer.shutdown(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void abort() |
| | | { |
| | | if (cursor == null) |
| | | return; |
| | | synchronized (this) |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return; |
| | | } |
| | | isClosed = true; |
| | | } |
| | | |
| | | boolean closeHasFailed = false; |
| | | |
| | | try |
| | | { |
| | | closeLockedCursor(cursor); |
| | | cursor = null; |
| | | } |
| | | catch (LockConflictException e1) |
| | | catch (LockConflictException e) |
| | | { |
| | | // The DB documentation states that a DeadlockException |
| | | // on the close method of a cursor that is aborting should |
| | |
| | | mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | replicationServer.shutdown(); |
| | | closeHasFailed = true; |
| | | } |
| | | |
| | | if (txn != null) |
| | | { |
| | | try |
| | | { |
| | | txn.abort(); |
| | | } catch (DatabaseException e) |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | replicationServer.shutdown(); |
| | | closeHasFailed = true; |
| | | } |
| | | } |
| | | |
| | | if (closeHasFailed) |
| | | { |
| | | replicationServer.shutdown(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | return null; |
| | | } |
| | | try |
| | | { |
| | | String csnString = new String(key.getData(), "UTF-8"); |
| | | return new ChangeNumber(csnString); |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // can't happen |
| | | return null; |
| | | } |
| | | String csnString = decodeUTF8(key.getData()); |
| | | return new ChangeNumber(csnString); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | try |
| | | { |
| | | ChangeNumber cn=new ChangeNumber(new String(key.getData(), "UTF-8")); |
| | | if(ReplicationDB.isaCounter(cn)) |
| | | ChangeNumber cn = new ChangeNumber( |
| | | decodeUTF8(key.getData())); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | { |
| | | // counter record |
| | | continue; |
| | | } |
| | | currentChange = ReplicationData.generateChange(data.getData()); |
| | | } catch (Exception e) { |
| | | currentChange = ReplicationData.generateChange(data |
| | | .getData()); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | /* |
| | | * An error happening trying to convert the data from the |
| | | * replicationServer database to an Update Message. |
| | | * This can only happen if the database is corrupted. |
| | | * There is not much more that we can do at this point except trying |
| | | * to continue with the next record. |
| | | * In such case, it is therefore possible that we miss some changes. |
| | | * TODO. log an error message. |
| | | * TODO : REPAIR : Such problem should be handled by the |
| | | * repair functionality. |
| | | * replicationServer database to an Update Message. This can only |
| | | * happen if the database is corrupted. There is not much more that we |
| | | * can do at this point except trying to continue with the next |
| | | * record. In such case, it is therefore possible that we miss some |
| | | * changes. TODO. log an error message. TODO : REPAIR : Such problem |
| | | * should be handled by the repair functionality. |
| | | */ |
| | | } |
| | | } |
| | |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | // test whether the record is a regular change or a counter |
| | | String csnString = new String(key.getData(), "UTF-8"); |
| | | String csnString = decodeUTF8(key.getData()); |
| | | cn = new ChangeNumber(csnString); |
| | | if (cn.getServerId() != 0) |
| | | { |
| | |
| | | status = cursor.getSearchKey(key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.SUCCESS) |
| | | { |
| | | cn = new ChangeNumber(new String(key.getData(), "UTF-8")); |
| | | cn = new ChangeNumber(decodeUTF8(key.getData())); |
| | | } |
| | | else |
| | | { |
| | |
| | | } |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | cn = new ChangeNumber(new String(key.getData(), "UTF-8")); |
| | | cn = new ChangeNumber(decodeUTF8(key.getData())); |
| | | if (!ReplicationDB.isaCounter(cn)) |
| | | { |
| | | // regular change record |
| | |
| | | } |
| | | } |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | replicationServer.shutdown(); |
| | | } |
| | | catch (DataFormatException e) |
| | | { |
| | | // Should never happen |
| | | } |
| | | finally |
| | | { |
| | | if (cursor != null) |
| | |
| | | txn.abort(); |
| | | } catch (DatabaseException e1) |
| | | { |
| | | // can't do much more. The ReplicationServer is shuting down. |
| | | // can't do much more. The ReplicationServer is shutting down. |
| | | } |
| | | } |
| | | } |
| | |
| | | * Decode the provided database entry as a the value of a counter. |
| | | * @param entry The provided entry. |
| | | * @return The counter value. |
| | | * @throws DataFormatException |
| | | */ |
| | | private static int decodeCounterValue(byte[] entry) |
| | | throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | String numAckStr = new String(entry, 0, entry.length, "UTF-8"); |
| | | return Integer.parseInt(numAckStr); |
| | | |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | throw new DataFormatException("UTF-8 is not supported by this jvm."); |
| | | } |
| | | String numAckStr = decodeUTF8(entry); |
| | | return Integer.parseInt(numAckStr); |
| | | } |
| | | |
| | | /** |
| | | * Encode the provided counter value in a database entry. |
| | | * @param entry The provided entry. |
| | | * @return The databse entry with the counter value encoded inside.. |
| | | * @throws UnsupportedEncodingException |
| | | * @return The database entry with the counter value encoded inside. |
| | | */ |
| | | static private DatabaseEntry encodeCounterValue(int value) |
| | | throws UnsupportedEncodingException |
| | | { |
| | | DatabaseEntry entry = new DatabaseEntry(); |
| | | entry.setData(String.valueOf(value).getBytes("UTF-8")); |
| | | entry.setData(getBytes(String.valueOf(value))); |
| | | return entry; |
| | | } |
| | | |