opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -478,8 +478,7 @@ /* the trim is done by group in order to save some CPU and IO bandwidth * start the transaction then do a bunch of remove then commit */ ReplServerDBCursor cursor; cursor = db.openDeleteCursor(); ReplServerDBCursor cursor = db.openDeleteCursor(); try { @@ -517,7 +516,7 @@ throw (e); } } catch (DatabaseException e) catch (Exception e) { // mark shutdown for this db so that we don't try again to // stop it from cursor.close() or methods called by cursor.close() opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -29,6 +29,7 @@ import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.decodeUTF8; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.io.UnsupportedEncodingException; @@ -264,14 +265,7 @@ /* database is empty */ return 0; } try { str = new String(key.getData(), "UTF-8"); } catch (UnsupportedEncodingException e) { // never happens, return anyway return 0; } str = decodeUTF8(key.getData()); int sn = new Integer(str); return sn; } @@ -341,14 +335,7 @@ /* database is empty */ return 0; } try { str = new String(key.getData(), "UTF-8"); } catch (UnsupportedEncodingException e) { // never happens, returns anyway return 0; } str = decodeUTF8(key.getData()); int sn = new Integer(str); return sn; } @@ -387,47 +374,57 @@ */ public class DraftCNDBCursor { private Cursor cursor = null; private final Cursor cursor; // The transaction that will protect the actions done with the cursor // Will be let null for a read cursor // Will be set non null for a write cursor private Transaction txn = null; DatabaseEntry key = new DatabaseEntry(); DatabaseEntry entry = new DatabaseEntry(); private final Transaction txn; private final DatabaseEntry key; private final DatabaseEntry entry; private boolean isClosed = false; /** * Creates a cursor that can be used for browsing the db. * * @param startingDraftCN the draftCN from which the cursor must * start. * @throws Exception when the startingDraftCN does not exist. * @param startingDraftCN * the draftCN from which the cursor must start. * @throws Exception * when the startingDraftCN does not exist. */ private DraftCNDBCursor(int startingDraftCN) throws Exception { try { // For consistency with other constructor, we'll use a local here, // even though it's always null. final Transaction localTxn = null; Cursor localCursor = null; this.key = new ReplicationDraftCNKey(startingDraftCN); this.entry = new DatabaseEntry(); // Take the lock. From now on, whatever error that happen in the life // of this cursor should end by unlocking that lock. We must also // unlock it when throwing an exception. dbCloseLock.readLock().lock(); cursor = db.openCursor(txn, null); try { localCursor = db.openCursor(localTxn, null); if (startingDraftCN >= 0) { key = new ReplicationDraftCNKey(startingDraftCN); entry = new DatabaseEntry(); if (cursor.getSearchKey(key, entry, LockMode.DEFAULT) != OperationStatus.SUCCESS) if (localCursor.getSearchKey( key, entry, LockMode.DEFAULT) != OperationStatus.SUCCESS) { // We could not move the cursor to the expected startingChangeNumber if (cursor.getSearchKeyRange(key, entry, LockMode.DEFAULT) != OperationStatus.SUCCESS) if (localCursor.getSearchKeyRange(key, entry, LockMode.DEFAULT) != OperationStatus.SUCCESS) { // We could not even move the cursor closed to it => failure throw new Exception("ChangeLog Draft Change Number " + startingDraftCN + " is not available"); throw new Exception("ChangeLog Draft Change Number " + startingDraftCN + " is not available"); } else { @@ -435,57 +432,76 @@ // Let's create a cursor from that point. DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) if (localCursor.getPrev( key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { closeLockedCursor(cursor); dbCloseLock.readLock().lock(); cursor = db.openCursor(txn, null); localCursor.close(); localCursor = db.openCursor(localTxn, null); } } } else { // success : key has the right value } } this.txn = localTxn; this.cursor = localCursor; } catch (Exception e) { // Unlocking is required before throwing any exception closeLockedCursor(cursor); throw (e); closeLockedCursor(localCursor); throw e; } } private DraftCNDBCursor() throws DatabaseException private DraftCNDBCursor() throws Exception { try { Transaction localTxn = null; Cursor localCursor = null; this.key = new DatabaseEntry(); this.entry = new DatabaseEntry(); // We'll go on only if no close or no clear is running dbCloseLock.readLock().lock(); try { // Create the transaction that will protect whatever done with this // write cursor. txn = dbenv.beginTransaction(); localTxn = dbenv.beginTransaction(); localCursor = db.openCursor(localTxn, null); cursor = db.openCursor(txn, null); this.txn = localTxn; this.cursor = localCursor; } catch(DatabaseException e) catch (Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); if (txn != null) try { closeLockedCursor(localCursor); } catch (DatabaseException ignored) { // Ignore. TRACER.debugCaught(DebugLogLevel.ERROR, ignored); } if (localTxn != null) { try { txn.abort(); localTxn.abort(); } catch (DatabaseException dbe) {} catch (DatabaseException ignored) { // Ignore. TRACER.debugCaught(DebugLogLevel.ERROR, ignored); } closeLockedCursor(cursor); throw (e); } throw e; } } @@ -494,33 +510,50 @@ */ public void close() { synchronized (this) { if (isClosed) { return; } isClosed = true; } boolean closeHasFailed = false; try { closeLockedCursor(cursor); cursor = null; } catch (DatabaseException e) catch (Exception e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); closeHasFailed = true; } if (txn != null) { try { txn.commit(); } catch (DatabaseException e) } catch (Exception e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); closeHasFailed = true; } } if (closeHasFailed) { replicationServer.shutdown(); } } /** @@ -532,49 +565,63 @@ */ public void abort() { if (cursor == null) synchronized (this) { if (isClosed) { return; } isClosed = true; } boolean closeHasFailed = false; try { closeLockedCursor(cursor); cursor = null; } catch (LockConflictException e1) catch (LockConflictException e) { // The DB documentation states that a DeadlockException // on the close method of a cursor that is aborting should // be ignored. } catch (DatabaseException e) catch (Exception e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); closeHasFailed = true; } if (txn != null) { try { txn.abort(); } catch (DatabaseException e) } catch (Exception e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); closeHasFailed = true; } } if (closeHasFailed) { replicationServer.shutdown(); } } /** * Getter for the value field of the current cursor. * @return The current value field. * @throws DatabaseException When an error happens. */ public String currentValue() throws DatabaseException public String currentValue() { try { @@ -598,9 +645,8 @@ /** * Getter for the serviceID field of the current cursor. * @return The current serviceID. * @throws DatabaseException When an error happens. */ public String currentServiceID() throws DatabaseException public String currentServiceID() { try { @@ -616,7 +662,7 @@ } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } return null; } @@ -624,9 +670,8 @@ /** * Returns the replication changeNumber associated with the current key. * @return the replication changeNumber * @throws DatabaseException when a problem occurs. */ public ChangeNumber currentChangeNumber() throws DatabaseException public ChangeNumber currentChangeNumber() { try { @@ -672,6 +717,16 @@ { cursor.delete(); } /** * Returns the current key associated with this cursor. * * @return The current key associated with this cursor. */ public DatabaseEntry getKey() { return key; } } /** opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -417,16 +417,16 @@ // could not handle the Deadlock after DEADLOCK_RETRIES tries. // shutdown the ReplicationServer. shutdown = true; throw (e); throw e; } } catch (DatabaseException e) catch (Exception e) { // mark shutdown for this db so that we don't try again to // stop it from cursor.close() or methods called by cursor.close() shutdown = true; cursor.abort(); throw (e); throw e; } } } opends/src/server/org/opends/server/replication/server/DraftCNDbIterator.java
@@ -23,6 +23,7 @@ * * * Copyright 2009 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; @@ -124,7 +125,7 @@ */ public int getDraftCN() { ReplicationDraftCNKey sk = (ReplicationDraftCNKey)this.draftCNDbCursor.key; ReplicationDraftCNKey sk = (ReplicationDraftCNKey) draftCNDbCursor.getKey(); int currentSeqnum = sk.getDraftCN(); return currentSeqnum; } opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -350,18 +350,22 @@ new ReplicationIteratorComparator(); SortedSet<ReplicationIterator> iteratorSortedSet = new TreeSet<ReplicationIterator>(comparator); try { /* fill the lateQueue */ for (int serverId : replicationServerDomain.getServers()) { ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId); ReplicationIterator iterator = replicationServerDomain.getChangelogIterator(serverId, lastCsn); ChangeNumber lastCsn = serverState .getMaxChangeNumber(serverId); ReplicationIterator iterator = replicationServerDomain .getChangelogIterator(serverId, lastCsn); if (iterator != null) { if (iterator.getChange() != null) { iteratorSortedSet.add(iterator); } else } else { iterator.releaseCursor(); } @@ -371,25 +375,36 @@ // The loop below relies on the fact that it is sorted based // on the currentChange of each iterator to consider the next // change across all servers. // Hence it is necessary to remove and eventual add again an iterator // when looping in order to keep consistent the order of the // iterators (see ReplicationIteratorComparator. while (!iteratorSortedSet.isEmpty() && (lateQueue.count()<100) && (lateQueue.bytesCount()<50000) ) // // Hence it is necessary to remove and eventual add again an // iterator when looping in order to keep consistent the order of // the iterators (see ReplicationIteratorComparator. while (!iteratorSortedSet.isEmpty() && (lateQueue.count() < 100) && (lateQueue.bytesCount() < 50000)) { ReplicationIterator iterator = iteratorSortedSet.first(); ReplicationIterator iterator = iteratorSortedSet .first(); iteratorSortedSet.remove(iterator); lateQueue.add(iterator.getChange()); if (iterator.next()) { iteratorSortedSet.add(iterator); } else { iterator.releaseCursor(); } } } finally { for (ReplicationIterator iterator : iteratorSortedSet) { iterator.releaseCursor(); } } /* * If the late queue is empty then we could not find any * messages in the replication log so the remote serevr is not @@ -527,10 +542,17 @@ // if that iterator has changes, then it is a candidate // it is added in the sorted list at a position given by its // current change (see ReplicationIteratorComparator). if ((iterator != null) && (iterator.getChange() != null)) if (iterator != null) { if (iterator.getChange() != null) { iteratorSortedSet.add(iterator); } else { iterator.releaseCursor(); } } } UpdateMsg msg = iteratorSortedSet.first().getChange(); result = msg.getChangeNumber(); opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -30,6 +30,8 @@ import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.util.StaticUtils.decodeUTF8; import static org.opends.server.util.StaticUtils.getBytes; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.util.List; @@ -37,8 +39,8 @@ import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.protocol.UpdateMsg; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.zip.DataFormatException; import com.sleepycat.je.Cursor; import com.sleepycat.je.DatabaseEntry; @@ -144,12 +146,12 @@ // Initialize counter this.counterCurrValue = 1; cursor = db.openCursor(txn, null); try { status = cursor.getLast(key, data, LockMode.DEFAULT); while (status == OperationStatus.SUCCESS) { try { ChangeNumber cn =new ChangeNumber(new String(key.getData(), "UTF-8")); ChangeNumber cn = new ChangeNumber(decodeUTF8(key.getData())); if (!ReplicationDB.isaCounter(cn)) { status = cursor.getPrev(key, data, LockMode.DEFAULT); @@ -163,33 +165,12 @@ break; } } catch (UnsupportedEncodingException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); if (txn != null) { try { txn.abort(); } catch (DatabaseException e1) { // can't do much more. The ReplicationServer is shuting down. } } replicationServer.shutdown(); } catch (DataFormatException e) { // Should never happen } } counterCurrValue += distBackToCounterRecord; } finally { cursor.close(); } } /** @@ -377,31 +358,23 @@ String str = null; ChangeNumber cn = null; try { dbCloseLock.readLock().lock(); cursor = db.openCursor(null, null); } catch (DatabaseException e1) { dbCloseLock.readLock().unlock(); return null; } try { try { DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); cursor = db.openCursor(null, null); OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { /* database is empty */ return null; } try { str = new String(key.getData(), "UTF-8"); str = decodeUTF8(key.getData()); cn = new ChangeNumber(str); if (ReplicationDB.isaCounter(cn)) { @@ -414,18 +387,9 @@ } else { cn = new ChangeNumber(new String(key.getData(), "UTF-8")); cn = new ChangeNumber(decodeUTF8(key.getData())); } } } catch (UnsupportedEncodingException e) { // never happens } } finally { closeLockedCursor(cursor); } } catch (DatabaseException e) { @@ -437,11 +401,18 @@ replicationServer.shutdown(); cn = null; } finally { closeLockedCursor(cursor); } return cn; } /** * Read the last Change from the database. * * @return the last ChangeNumber. */ public ChangeNumber readLastChange() @@ -449,45 +420,38 @@ Cursor cursor = null; ChangeNumber cn = null; try { dbCloseLock.readLock().lock(); try { cursor = db.openCursor(null, null); DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT); cursor = db.openCursor(null, null); OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { /* database is empty */ return null; } try { String str = new String(key.getData(), "UTF-8"); String str = decodeUTF8(key.getData()); cn = new ChangeNumber(str); if (ReplicationDB.isaCounter(cn)) { if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { /* database only contain a counter record - don't know * how much it can be possible but ... */ /* * database only contain a counter record - don't know how much it can * be possible but ... */ cn = null; } } } catch (UnsupportedEncodingException e) { // never happens } } finally { closeLockedCursor(cursor); } } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); @@ -497,6 +461,11 @@ replicationServer.shutdown(); cn = null; } finally { closeLockedCursor(cursor); } return cn; } @@ -515,44 +484,56 @@ */ public class ReplServerDBCursor { private Cursor cursor = null; // The transaction that will protect the actions done with the cursor // Will be let null for a read cursor // Will be set non null for a write cursor private Transaction txn = null; DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); private final Transaction txn; private final Cursor cursor; private final DatabaseEntry key; private final DatabaseEntry data; private boolean isClosed = false; /** * Creates a ReplServerDBCursor that can be used for browsing a * replicationServer db. * * @param startingChangeNumber The ChangeNumber from which the cursor must * start. * @throws Exception When the startingChangeNumber does not exist. * @param startingChangeNumber * The ChangeNumber from which the cursor must start. * @throws Exception * When the startingChangeNumber does not exist. */ private ReplServerDBCursor(ChangeNumber startingChangeNumber) throws Exception { try if (startingChangeNumber != null) { key = new ReplicationKey(startingChangeNumber); } else { key = new DatabaseEntry(); } data = new DatabaseEntry(); txn = null; // Take the lock. From now on, whatever error that happen in the life // of this cursor should end by unlocking that lock. We must also // unlock it when throwing an exception. dbCloseLock.readLock().lock(); cursor = db.openCursor(txn, null); Cursor localCursor = null; try { localCursor = db.openCursor(txn, null); if (startingChangeNumber != null) { key = new ReplicationKey(startingChangeNumber); data = new DatabaseEntry(); if (cursor.getSearchKey(key, data, LockMode.DEFAULT) != if (localCursor.getSearchKey(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { // We could not move the cursor to the expected startingChangeNumber if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) != if (localCursor.getSearchKeyRange(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { // We could not even move the cursor closed to it => failure @@ -564,51 +545,75 @@ // Let's create a cursor from that point. DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); if (cursor.getPrev(key, data, LockMode.DEFAULT) != if (localCursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { closeLockedCursor(cursor); dbCloseLock.readLock().lock(); cursor = db.openCursor(txn, null); localCursor.close(); localCursor = db.openCursor(txn, null); } } } } cursor = localCursor; } catch (Exception e) { // Unlocking is required before throwing any exception closeLockedCursor(cursor); throw (e); try { closeLockedCursor(localCursor); } catch (Exception ignore) { // Ignore. } throw e; } } private ReplServerDBCursor() throws DatabaseException private ReplServerDBCursor() throws Exception { try { key = new DatabaseEntry(); data = new DatabaseEntry(); // We'll go on only if no close or no clear is running dbCloseLock.readLock().lock(); Transaction localTxn = null; Cursor localCursor = null; try { // Create the transaction that will protect whatever done with this // write cursor. txn = dbenv.beginTransaction(); localTxn = dbenv.beginTransaction(); localCursor = db.openCursor(localTxn, null); cursor = db.openCursor(txn, null); txn = localTxn; cursor = localCursor; } catch(DatabaseException e) { if (txn != null) catch (Exception e) { try { txn.abort(); closeLockedCursor(localCursor); } catch (DatabaseException dbe) {} catch (Exception ignore) { // Ignore. } closeLockedCursor(cursor); throw (e); if (localTxn != null) { try { localTxn.abort(); } catch (DatabaseException ignore) { // Ignore. } } throw e; } } @@ -617,10 +622,20 @@ */ public void close() { synchronized (this) { if (isClosed) { return; } isClosed = true; } boolean closeHasFailed = false; try { closeLockedCursor(cursor); cursor = null; } catch (DatabaseException e) { @@ -628,22 +643,29 @@ mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); closeHasFailed = true; } if (txn != null) { try { txn.commit(); } catch (DatabaseException e) } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); closeHasFailed = true; } } if (closeHasFailed) { replicationServer.shutdown(); } } /** @@ -655,14 +677,22 @@ */ public void abort() { if (cursor == null) synchronized (this) { if (isClosed) { return; } isClosed = true; } boolean closeHasFailed = false; try { closeLockedCursor(cursor); cursor = null; } catch (LockConflictException e1) catch (LockConflictException e) { // The DB documentation states that a DeadlockException // on the close method of a cursor that is aborting should @@ -674,22 +704,29 @@ mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); closeHasFailed = true; } if (txn != null) { try { txn.abort(); } catch (DatabaseException e) } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); closeHasFailed = true; } } if (closeHasFailed) { replicationServer.shutdown(); } } /** @@ -706,15 +743,8 @@ { return null; } try { String csnString = new String(key.getData(), "UTF-8"); String csnString = decodeUTF8(key.getData()); return new ChangeNumber(csnString); } catch (UnsupportedEncodingException e) { // can't happen return null; } } /** @@ -738,26 +768,29 @@ { return null; } try { ChangeNumber cn=new ChangeNumber(new String(key.getData(), "UTF-8")); ChangeNumber cn = new ChangeNumber( decodeUTF8(key.getData())); if(ReplicationDB.isaCounter(cn)) { // counter record continue; } currentChange = ReplicationData.generateChange(data.getData()); } catch (Exception e) { currentChange = ReplicationData.generateChange(data .getData()); } catch (Exception e) { /* * An error happening trying to convert the data from the * replicationServer database to an Update Message. * This can only happen if the database is corrupted. * There is not much more that we can do at this point except trying * to continue with the next record. * In such case, it is therefore possible that we miss some changes. * TODO. log an error message. * TODO : REPAIR : Such problem should be handled by the * repair functionality. * replicationServer database to an Update Message. This can only * happen if the database is corrupted. There is not much more that we * can do at this point except trying to continue with the next * record. In such case, it is therefore possible that we miss some * changes. TODO. log an error message. TODO : REPAIR : Such problem * should be handled by the repair functionality. */ } } @@ -859,7 +892,7 @@ while (status == OperationStatus.SUCCESS) { // test whether the record is a regular change or a counter String csnString = new String(key.getData(), "UTF-8"); String csnString = decodeUTF8(key.getData()); cn = new ChangeNumber(csnString); if (cn.getServerId() != 0) { @@ -900,7 +933,7 @@ status = cursor.getSearchKey(key, data, LockMode.DEFAULT); if (status == OperationStatus.SUCCESS) { cn = new ChangeNumber(new String(key.getData(), "UTF-8")); cn = new ChangeNumber(decodeUTF8(key.getData())); } else { @@ -915,7 +948,7 @@ } while (status == OperationStatus.SUCCESS) { cn = new ChangeNumber(new String(key.getData(), "UTF-8")); cn = new ChangeNumber(decodeUTF8(key.getData())); if (!ReplicationDB.isaCounter(cn)) { // regular change record @@ -952,18 +985,6 @@ } } } catch (UnsupportedEncodingException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); } catch (DataFormatException e) { // Should never happen } finally { if (cursor != null) @@ -975,7 +996,7 @@ txn.abort(); } catch (DatabaseException e1) { // can't do much more. The ReplicationServer is shuting down. // can't do much more. The ReplicationServer is shutting down. } } } @@ -996,33 +1017,22 @@ * Decode the provided database entry as a the value of a counter. * @param entry The provided entry. * @return The counter value. * @throws DataFormatException */ private static int decodeCounterValue(byte[] entry) throws DataFormatException { try { String numAckStr = new String(entry, 0, entry.length, "UTF-8"); String numAckStr = decodeUTF8(entry); return Integer.parseInt(numAckStr); } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } } /** * Encode the provided counter value in a database entry. * @param entry The provided entry. * @return The databse entry with the counter value encoded inside.. * @throws UnsupportedEncodingException * @return The database entry with the counter value encoded inside. */ static private DatabaseEntry encodeCounterValue(int value) throws UnsupportedEncodingException { DatabaseEntry entry = new DatabaseEntry(); entry.setData(String.valueOf(value).getBytes("UTF-8")); entry.setData(getBytes(String.valueOf(value))); return entry; } opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -32,6 +32,8 @@ import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.types.DebugLogLevel; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; @@ -131,9 +133,9 @@ */ private void start() throws DatabaseException, ReplicationDBException { Cursor cursor = stateDb.openCursor(null, null); DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); Cursor cursor = stateDb.openCursor(null, null); try { @@ -259,8 +261,15 @@ } finally { try { cursor.close(); } catch (Exception ignored) { TRACER.debugCaught(DebugLogLevel.ERROR, ignored); } } } /** opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1452,33 +1452,6 @@ } /** * Creates and returns an iterator. * When the iterator is not used anymore, the caller MUST call the * ReplicationIterator.releaseCursor() method to free the resources * and locks used by the ReplicationIterator. * * @param serverId Identifier of the server for which the iterator is created. * @param changeNumber Starting point for the iterator. * @return the created ReplicationIterator. Null when no DB is available * for the provided server Id. */ public ReplicationIterator getIterator(int serverId, ChangeNumber changeNumber) { DbHandler handler = sourceDbHandlers.get(serverId); if (handler == null) return null; try { ReplicationIterator it = handler.generateIterator(changeNumber); return it; } catch (Exception e) { return null; } } /** * Returns the change count for that ReplicationServerDomain. * * @return the change count. opends/src/server/org/opends/server/util/StaticUtils.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.util; @@ -31,13 +32,7 @@ import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.util.ServerConstants.*; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.*; import java.lang.reflect.InvocationTargetException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -170,6 +165,61 @@ /** * Returns the provided byte array decoded as a UTF-8 string without throwing * an UnsupportedEncodingException. This method is equivalent to: * * <pre> * try * { * return new String(bytes, "UTF-8"); * } * catch (UnsupportedEncodingException e) * { * // Should never happen: UTF-8 is always supported. * throw new RuntimeException(e); * } * </pre> * * @param bytes * The byte array to be decoded as a UTF-8 string. * @return The decoded string. */ public static String decodeUTF8(final byte[] bytes) { Validator.ensureNotNull(bytes); if (bytes.length == 0) { return "".intern(); } final StringBuilder builder = new StringBuilder(bytes.length); final int sz = bytes.length; for (int i = 0; i < sz; i++) { final byte b = bytes[i]; if ((b & 0x7f) != b) { try { builder.append(new String(bytes, i, (sz - i), "UTF-8")); } catch (UnsupportedEncodingException e) { // Should never happen: UTF-8 is always supported. throw new RuntimeException(e); } break; } builder.append((char) b); } return builder.toString(); } /** * Construct a byte array containing the UTF-8 encoding of the * provided <code>char</code> array. * opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DraftCNDbHandlerTest.java
@@ -23,6 +23,7 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; @@ -126,6 +127,8 @@ assertEquals(handler.getLastKey(), sn3); DraftCNDBCursor dbc = handler.getReadCursor(firstkey); try { assertEquals(dbc.currentChangeNumber(), changeNumber1); assertEquals(dbc.currentServiceID(), serviceID1); assertEquals(dbc.currentValue(), value1); @@ -144,8 +147,11 @@ assertEquals(dbc.currentValue(), value3); assertFalse(dbc.next()); } finally { handler.releaseReadCursor(dbc); } handler.setPurgeDelay(100); @@ -258,25 +264,43 @@ assertEquals(handler.getValue(sn3),value3); DraftCNDbIterator it = handler.generateIterator(sn1); try { assertEquals(it.getDraftCN(),sn1); assertTrue(it.next()); assertEquals(it.getDraftCN(),sn2); assertTrue(it.next()); assertEquals(it.getDraftCN(),sn3); assertFalse(it.next()); } finally { it.releaseCursor(); } it = handler.generateIterator(sn2); try { assertEquals(it.getDraftCN(),sn2); assertTrue(it.next()); assertEquals(it.getDraftCN(),sn3); assertFalse(it.next()); } finally { it.releaseCursor(); } it = handler.generateIterator(sn3); try { assertEquals(it.getDraftCN(),sn3); assertFalse(it.next()); } finally { it.releaseCursor(); } // Clear ... handler.clear(); opends/tests/unit-tests-testng/src/server/org/opends/server/util/TestStaticUtils.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.util; @@ -138,6 +139,21 @@ } /** * Tests the {@link StaticUtils#decodeUTF8(byte[])} method. * * @param inputString * The input string. * @throws Exception * If the test failed unexpectedly. */ @Test(dataProvider = "getBytesTestData") public void testDecodeUTF8(String inputString) throws Exception { final byte[] bytes = inputString.getBytes("UTF-8"); Assert.assertEquals(StaticUtils.decodeUTF8(bytes), inputString); } /** * Tests the {@link StaticUtils#getBytes(String)} method. * * @param inputString @@ -1174,7 +1190,7 @@ * If the test failed unexpectedly. */ @Test(dataProvider = "listsAreEqualTestData") public void testListsAreEqual(List list1, List list2, boolean result) public void testListsAreEqual(List<?> list1, List<?> list2, boolean result) throws Exception { Assert.assertEquals(StaticUtils.listsAreEqual(list1, list2), result); }