| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | import org.opends.messages.MessageBuilder; |
| | |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import java.util.concurrent.locks.ReentrantReadWriteLock; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import com.sleepycat.je.Cursor; |
| | | import com.sleepycat.je.DatabaseEntry; |
| | |
| | | // close the db (shutdown or clear). |
| | | private ReentrantReadWriteLock dbCloseLock; |
| | | |
| | | // Change counter management |
| | | // The Db itself does not allow to count records between a start and an end |
| | | // change. And we cannot rely on the replication seqnum that is part of the |
| | | // changenumber, since there can be holes (when an operation is canceled). |
| | | // And traversing all the records from the start one to the end one works |
| | | // fine but can be very long (ECL:lastChangeNumber). |
| | | // |
| | | // So we are storing special records in the DB (called counter records), |
| | | // that contain the number of changes since the previous counter record. |
| | | // One special record is : |
| | | // - a special key : changetime , serverid=0 seqnum=0 |
| | | // - a counter value : count of changes since previous counter record. |
| | | // |
| | | // A counter record has to follow the order of the db, so it needs to have |
| | | // a changenumber key that follow the order. |
| | | // A counter record must have its own chagenumber key since the Db does not |
| | | // support duplicate key (it is a compatibility breaker character of the DB). |
| | | // |
| | | // We define 2 conditions to store a counter record : |
| | | // 1/- at least 'counterWindowSize' changes have been stored in the Db |
| | | // since the previous counter record |
| | | // 2/- the change to be stored has a new timestamp - so that the counter |
| | | // record is the first record for this timestamp. |
| | | // |
| | | |
| | | |
| | | private int counterCurrValue = 1; |
| | | // Current value of the counter. |
| | | |
| | | private long counterTsLimit = 0; |
| | | // When not null, |
| | | // the next change with a ts different from tsForNewCounterRecord will lead |
| | | // to store a new counterRecord. |
| | | |
| | | private int counterWindowSize = 1000; |
| | | // The counter record will never be written to the db more often than each |
| | | // counterWindowSize changes. |
| | | |
| | | /** |
| | | * Creates a new database or open existing database that will be used |
| | | * to store and retrieve changes from an LDAP server. |
| | |
| | | true).getGenerationId()); |
| | | |
| | | dbCloseLock = new ReentrantReadWriteLock(true); |
| | | |
| | | // |
| | | Cursor cursor = null; |
| | | Transaction txn = null; |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status; |
| | | int distBackToCounterRecord = 0; |
| | | |
| | | // Initialize counter |
| | | this.counterCurrValue = 1; |
| | | cursor = db.openCursor(txn, null); |
| | | status = cursor.getLast(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | try |
| | | { |
| | | ChangeNumber cn =new ChangeNumber(new String(key.getData(), "UTF-8")); |
| | | if (!ReplicationDB.isaCounter(cn)) |
| | | { |
| | | status = cursor.getPrev(key, data, LockMode.DEFAULT); |
| | | distBackToCounterRecord++; |
| | | } |
| | | else |
| | | { |
| | | // counter record |
| | | counterCurrValue = decodeCounterValue(data.getData())+1; |
| | | counterTsLimit = cn.getTime(); |
| | | break; |
| | | } |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | replicationServer.shutdown(); |
| | | if (txn != null) |
| | | { |
| | | try |
| | | { |
| | | txn.abort(); |
| | | } catch (DatabaseException e1) |
| | | { |
| | | // can't do much more. The ReplicationServer is shuting down. |
| | | } |
| | | } |
| | | replicationServer.shutdown(); |
| | | } |
| | | catch (DataFormatException e) |
| | | { |
| | | // Should never happen |
| | | } |
| | | } |
| | | counterCurrValue += distBackToCounterRecord; |
| | | cursor.close(); |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | DatabaseEntry key = new ReplicationKey(change.getChangeNumber()); |
| | | DatabaseEntry data = new ReplicationData(change); |
| | | db.put(txn, key, data); |
| | | } |
| | | |
| | | if ((counterCurrValue!=0) && |
| | | (counterCurrValue%counterWindowSize == 0)) |
| | | { |
| | | // enough changes to generate a counter record - wait for the next |
| | | // change fo time |
| | | counterTsLimit = change.getChangeNumber().getTime(); |
| | | } |
| | | if ((counterTsLimit!=0) |
| | | && (change.getChangeNumber().getTime() != counterTsLimit)) |
| | | { |
| | | // Write the counter record |
| | | DatabaseEntry counterKey = new ReplicationKey( |
| | | new ChangeNumber( |
| | | change.getChangeNumber().getTime(), |
| | | 0, 0)); |
| | | DatabaseEntry counterValue = |
| | | encodeCounterValue(counterCurrValue-1); |
| | | db.put(txn, counterKey, counterValue); |
| | | counterTsLimit=0; |
| | | } |
| | | db.put(txn, key, data); |
| | | counterCurrValue++; |
| | | |
| | | } |
| | | txn.commitWriteNoSync(); |
| | | txn = null; |
| | | done = true; |
| | |
| | | { |
| | | Cursor cursor = null; |
| | | String str = null; |
| | | ChangeNumber cn = null; |
| | | |
| | | try |
| | | { |
| | |
| | | try |
| | | { |
| | | str = new String(key.getData(), "UTF-8"); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | { |
| | | // First record is a counter record .. go next |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | // DB contains only a counter record |
| | | return null; |
| | | } |
| | | else |
| | | { |
| | | cn = new ChangeNumber(new String(key.getData(), "UTF-8")); |
| | | } |
| | | } |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // never happens |
| | | } |
| | | return new ChangeNumber(str); |
| | | } |
| | | finally |
| | | { |
| | |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | replicationServer.shutdown(); |
| | | return null; |
| | | cn = null; |
| | | } |
| | | return cn; |
| | | } |
| | | |
| | | /** |
| | |
| | | public ChangeNumber readLastChange() |
| | | { |
| | | Cursor cursor = null; |
| | | String str = null; |
| | | ChangeNumber cn = null; |
| | | |
| | | try |
| | | { |
| | |
| | | } |
| | | try |
| | | { |
| | | str = new String(key.getData(), "UTF-8"); |
| | | String str = new String(key.getData(), "UTF-8"); |
| | | cn = new ChangeNumber(str); |
| | | if (ReplicationDB.isaCounter(cn)) |
| | | { |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) != |
| | | OperationStatus.SUCCESS) |
| | | { |
| | | /* database only contain a counter record - don't know |
| | | * how much it can be possible but ... */ |
| | | cn = null; |
| | | } |
| | | } |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // never happens |
| | | } |
| | | return new ChangeNumber(str); |
| | | } |
| | | finally |
| | | { |
| | |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | replicationServer.shutdown(); |
| | | return null; |
| | | cn = null; |
| | | } |
| | | return cn; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | return null; |
| | | } |
| | | try { |
| | | try |
| | | { |
| | | ChangeNumber cn=new ChangeNumber(new String(key.getData(), "UTF-8")); |
| | | if(ReplicationDB.isaCounter(cn)) |
| | | { |
| | | // counter record |
| | | continue; |
| | | } |
| | | currentChange = ReplicationData.generateChange(data.getData()); |
| | | } catch (Exception e) { |
| | | /* |
| | |
| | | dbCloseLock.writeLock().unlock(); |
| | | } |
| | | } |
| | | /** |
| | | * Count the number of changes between 2 changes numbers (inclusive). |
| | | * @param start The lower limit of the count. |
| | | * @param stop The higher limit of the count. |
| | | * @return The number of changes between provided start and stop changeNumber. |
| | | * Returns -1 when an error occurs. |
| | | */ |
| | | public int count(ChangeNumber start, ChangeNumber stop) |
| | | { |
| | | int counterRecord1 = 0; |
| | | int counterRecord2 = 0; |
| | | int distToCounterRecord1 = 0; |
| | | int distBackToCounterRecord2 = 0; |
| | | int count=0; |
| | | Cursor cursor = null; |
| | | Transaction txn = null; |
| | | OperationStatus status; |
| | | try |
| | | { |
| | | ChangeNumber cn ; |
| | | |
| | | if ((start==null)&&(stop==null)) |
| | | return (int)db.count(); |
| | | |
| | | // Step 1 : from the start point, traverse db to the next counter record |
| | | // or to the stop point. |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | cursor = db.openCursor(txn, null); |
| | | if (start != null) |
| | | { |
| | | key = new ReplicationKey(start); |
| | | status = cursor.getSearchKey(key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT); |
| | | } |
| | | else |
| | | { |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | // test whether the record is a regular change or a counter |
| | | String csnString = new String(key.getData(), "UTF-8"); |
| | | cn = new ChangeNumber(csnString); |
| | | if (cn.getServerId() != 0) |
| | | { |
| | | // reached a regular change record |
| | | // test whether we reached the 'stop' target |
| | | if (!cn.newer(stop)) |
| | | { |
| | | // let's loop |
| | | distToCounterRecord1++; |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | else |
| | | { |
| | | // reached the end |
| | | break; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // counter record |
| | | counterRecord1 = decodeCounterValue(data.getData()); |
| | | break; |
| | | } |
| | | } |
| | | cursor.close(); |
| | | |
| | | // cases |
| | | // |
| | | if (counterRecord1==0) |
| | | return distToCounterRecord1; |
| | | |
| | | // Step 2 : from the stop point, traverse db to the next counter record |
| | | // or to the start point. |
| | | txn = null; |
| | | data = new DatabaseEntry(); |
| | | key = new ReplicationKey(stop); |
| | | cursor = db.openCursor(txn, null); |
| | | status = cursor.getSearchKey(key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.SUCCESS) |
| | | { |
| | | cn = new ChangeNumber(new String(key.getData(), "UTF-8")); |
| | | } |
| | | else |
| | | { |
| | | key = new DatabaseEntry(); |
| | | data = new DatabaseEntry(); |
| | | status = cursor.getLast(key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.SUCCESS) |
| | | { |
| | | /* database is empty */ |
| | | return 0; |
| | | } |
| | | } |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | cn = new ChangeNumber(new String(key.getData(), "UTF-8")); |
| | | if (!ReplicationDB.isaCounter(cn)) |
| | | { |
| | | // regular change record |
| | | if (!cn.older(start)) |
| | | { |
| | | distBackToCounterRecord2++; |
| | | status = cursor.getPrev(key, data, LockMode.DEFAULT); |
| | | } |
| | | else |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | // counter record |
| | | counterRecord2 = decodeCounterValue(data.getData()); |
| | | break; |
| | | } |
| | | } |
| | | cursor.close(); |
| | | |
| | | // Step 3 : Now consolidates the result |
| | | if (counterRecord1!=0) |
| | | { |
| | | if (counterRecord1 == counterRecord2) |
| | | { |
| | | // only one cp between from and to - no need to use it |
| | | count = distToCounterRecord1 + distBackToCounterRecord2; |
| | | } |
| | | else |
| | | { |
| | | // 2 cp between from and to |
| | | count = distToCounterRecord1 + (counterRecord2-counterRecord1) |
| | | + distBackToCounterRecord2; |
| | | } |
| | | } |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | replicationServer.shutdown(); |
| | | } |
| | | catch (DataFormatException e) |
| | | { |
| | | // Should never happen |
| | | } |
| | | finally |
| | | { |
| | | if (cursor != null) |
| | | cursor.close(); |
| | | if (txn != null) |
| | | { |
| | | try |
| | | { |
| | | txn.abort(); |
| | | } catch (DatabaseException e1) |
| | | { |
| | | // can't do much more. The ReplicationServer is shuting down. |
| | | } |
| | | } |
| | | } |
| | | return count; |
| | | } |
| | | |
| | | /** |
| | | * Test if a provided changeNumber represents a counter record. |
| | | * @param cn The provided changeNumber. |
| | | * @return True if the provided changenumber is a counter. |
| | | */ |
| | | static private boolean isaCounter(ChangeNumber cn) |
| | | { |
| | | return ((cn.getServerId()== 0) && (cn.getSeqnum()==0)); |
| | | } |
| | | |
| | | /** |
| | | * Decode the provided database entry as a the value of a counter. |
| | | * @param entry The provided entry. |
| | | * @return The counter value. |
| | | * @throws DataFormatException |
| | | */ |
| | | private static int decodeCounterValue(byte[] entry) |
| | | throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | String numAckStr = new String(entry, 0, entry.length, "UTF-8"); |
| | | return Integer.parseInt(numAckStr); |
| | | |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | throw new DataFormatException("UTF-8 is not supported by this jvm."); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Encode the provided counter value in a database entry. |
| | | * @param entry The provided entry. |
| | | * @return The databse entry with the counter value encoded inside.. |
| | | * @throws UnsupportedEncodingException |
| | | */ |
| | | static private DatabaseEntry encodeCounterValue(int value) |
| | | throws UnsupportedEncodingException |
| | | { |
| | | DatabaseEntry entry = new DatabaseEntry(); |
| | | entry.setData(String.valueOf(value).getBytes("UTF-8")); |
| | | return entry; |
| | | } |
| | | |
| | | /** |
| | | * Set the counter writing window size (public method for unit tests only). |
| | | * @param size Size in number of record. |
| | | */ |
| | | public void setCounterWindowSize(int size) |
| | | { |
| | | this.counterWindowSize = size; |
| | | } |
| | | |
| | | } |