| | |
| | | |
| | | /** |
| | | * This class is used for managing the replicationServer database for each |
| | | * server in the topology. |
| | | * It is responsible for efficiently saving the updates that is received from |
| | | * each master server into stable storage. |
| | | * This class is also able to generate a {@link ChangeNumberIndexDBCursor} that |
| | | * can be used to read all changes from a given draft ChangeNumber. |
| | | * server in the topology. It is responsible for efficiently saving the updates |
| | | * that is received from each master server into stable storage. This class is |
| | | * also able to generate a {@link ChangeNumberIndexDBCursor} that can be used to |
| | | * read all changes from a given change number. |
| | | * <p> |
| | | * This class publishes some monitoring information below <code> |
| | | * cn=monitor</code>. |
| | |
| | | |
| | | private DraftCNDB db; |
| | | /** |
| | | * FIXME Is this field that useful? {@link #getFirstDraftCN()} does not even |
| | | * use it! |
| | | * FIXME Is this field that useful? {@link #getFirstChangeNumber()} does not |
| | | * even use it! |
| | | */ |
| | | private int firstDraftCN = NO_KEY; |
| | | private int firstChangeNumber = NO_KEY; |
| | | /** |
| | | * FIXME Is this field that useful? {@link #getLastDraftCN()} does not even |
| | | * use it! It is not even updated. |
| | | * FIXME Is this field that useful? {@link #getLastChangeNumber()} does not |
| | | * even use it! |
| | | */ |
| | | private int lastDraftCN = NO_KEY; |
| | | private int lastChangeNumber = NO_KEY; |
| | | private DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | | private boolean shutdown = false; |
| | | private boolean trimDone = false; |
| | |
| | | |
| | | // DB initialization |
| | | db = new DraftCNDB(dbenv); |
| | | firstDraftCN = db.readFirstDraftCN(); |
| | | lastDraftCN = db.readLastDraftCN(); |
| | | firstChangeNumber = db.readFirstChangeNumber(); |
| | | lastChangeNumber = db.readLastChangeNumber(); |
| | | |
| | | // Trimming thread |
| | | thread = new DirectoryThread(this, "Replication DraftCN db"); |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public synchronized void add(int draftCN, String value, String baseDN, |
| | | CSN csn) |
| | | public synchronized void add(int changeNumber, String previousCookie, |
| | | String baseDN, CSN csn) |
| | | { |
| | | db.addEntry(draftCN, value, baseDN, csn); |
| | | db.addEntry(changeNumber, previousCookie, baseDN, csn); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In DraftCNDbhandler.add, added: " |
| | | + " key=" + draftCN |
| | | + " value=" + value |
| | | + " key=" + changeNumber |
| | | + " previousCookie=" + previousCookie |
| | | + " baseDN=" + baseDN |
| | | + " csn=" + csn); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int getFirstDraftCN() |
| | | public int getFirstChangeNumber() |
| | | { |
| | | return db.readFirstDraftCN(); |
| | | return db.readFirstChangeNumber(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int getLastDraftCN() |
| | | public int getLastChangeNumber() |
| | | { |
| | | return db.readLastDraftCN(); |
| | | return db.readLastChangeNumber(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * <ul> |
| | | * <li>open a cursor, check if the next entry exits, then close the cursor |
| | | * </li> |
| | | * <li>call <code>db.readFirstDraftCN() != 0</code></li> |
| | | * <li>call <code>db.readFirstChangeNumber() != 0</code></li> |
| | | * </ul> |
| | | */ |
| | | @Override |
| | |
| | | * <p> |
| | | * This method is only used by unit tests. |
| | | * |
| | | * @param startingDraftCN |
| | | * The draft change number from where to start. |
| | | * @param startChangeNumber |
| | | * The change number from where to start. |
| | | * @return the new cursor. |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | DraftCNDBCursor getReadCursor(int startingDraftCN) throws ChangelogException |
| | | DraftCNDBCursor getReadCursor(int startChangeNumber) |
| | | throws ChangelogException |
| | | { |
| | | return db.openReadCursor(startingDraftCN); |
| | | return db.openReadCursor(startChangeNumber); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ChangeNumberIndexDBCursor getCursorFrom(int startDraftCN) |
| | | public ChangeNumberIndexDBCursor getCursorFrom(int startChangeNumber) |
| | | throws ChangelogException |
| | | { |
| | | return new DraftCNDbIterator(db, startDraftCN); |
| | | return new DraftCNDbIterator(db, startChangeNumber); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | final ServerState startState = domain.getStartState(); |
| | | final CSN fcsn = startState.getCSN(csn.getServerId()); |
| | | |
| | | final int currentDraftCN = cursor.currentKey(); |
| | | final int currentChangeNumber = cursor.currentKey(); |
| | | |
| | | if (csn.older(fcsn)) |
| | | { |
| | |
| | | continue; |
| | | } |
| | | |
| | | firstDraftCN = currentDraftCN; |
| | | firstChangeNumber = currentChangeNumber; |
| | | cursor.close(); |
| | | return; |
| | | } |
| | |
| | | { |
| | | List<Attribute> attributes = new ArrayList<Attribute>(); |
| | | attributes.add(Attributes.create("first-draft-changenumber", |
| | | Integer.toString(db.readFirstDraftCN()))); |
| | | Integer.toString(db.readFirstChangeNumber()))); |
| | | attributes.add(Attributes.create("last-draft-changenumber", |
| | | Integer.toString(db.readLastDraftCN()))); |
| | | Integer.toString(db.readLastChangeNumber()))); |
| | | attributes.add(Attributes.create("count", |
| | | Long.toString(count()))); |
| | | return attributes; |
| | |
| | | @Override |
| | | public String getMonitorInstanceName() |
| | | { |
| | | return "Draft Changelog"; |
| | | return "ChangeNumber Index Database"; |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "draftCNdb:" + " " + firstDraftCN + " " + lastDraftCN; |
| | | return "draftCNdb:" + " " + firstChangeNumber + " " + lastChangeNumber; |
| | | } |
| | | |
| | | /** |
| | |
| | | public void clear() throws ChangelogException |
| | | { |
| | | db.clear(); |
| | | firstDraftCN = db.readFirstDraftCN(); |
| | | lastDraftCN = db.readLastDraftCN(); |
| | | firstChangeNumber = db.readFirstChangeNumber(); |
| | | lastChangeNumber = db.readLastChangeNumber(); |
| | | } |
| | | |
| | | private ReentrantLock lock = new ReentrantLock(); |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String getPreviousCookie(int draftCN) |
| | | public String getPreviousCookie(int changeNumber) |
| | | { |
| | | DraftCNDBCursor draftCNDBCursor = null; |
| | | DraftCNDBCursor cursor = null; |
| | | try |
| | | { |
| | | draftCNDBCursor = db.openReadCursor(draftCN); |
| | | return draftCNDBCursor.currentValue(); |
| | | cursor = db.openReadCursor(changeNumber); |
| | | return cursor.currentValue(); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | debugException("getValue", draftCN, e); |
| | | debugException("getValue", changeNumber, e); |
| | | return null; |
| | | } |
| | | finally |
| | | { |
| | | close(draftCNDBCursor); |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public CSN getCSN(int draftCN) |
| | | public CSN getCSN(int changeNumber) |
| | | { |
| | | DraftCNDBCursor draftCNDBCursor = null; |
| | | DraftCNDBCursor cursor = null; |
| | | try |
| | | { |
| | | draftCNDBCursor = db.openReadCursor(draftCN); |
| | | return draftCNDBCursor.currentCSN(); |
| | | cursor = db.openReadCursor(changeNumber); |
| | | return cursor.currentCSN(); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | debugException("getCSN", draftCN, e); |
| | | debugException("getCSN", changeNumber, e); |
| | | return null; |
| | | } |
| | | finally |
| | | { |
| | | close(draftCNDBCursor); |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | /**{@inheritDoc}*/ |
| | | @Override |
| | | public String getBaseDN(int draftCN) |
| | | public String getBaseDN(int changeNumber) |
| | | { |
| | | DraftCNDBCursor draftCNDBCursor = null; |
| | | DraftCNDBCursor cursor = null; |
| | | try |
| | | { |
| | | draftCNDBCursor = db.openReadCursor(draftCN); |
| | | return draftCNDBCursor.currentBaseDN(); |
| | | cursor = db.openReadCursor(changeNumber); |
| | | return cursor.currentBaseDN(); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | debugException("getBaseDN", draftCN, e); |
| | | debugException("getBaseDN", changeNumber, e); |
| | | return null; |
| | | } |
| | | finally |
| | | { |
| | | close(draftCNDBCursor); |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | private void debugException(String methodName, int draftCN, Exception e) |
| | | private void debugException(String methodName, int changeNumber, Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In DraftCNDbHandler." + methodName + "(), read: " |
| | | + " key=" + draftCN + " value returned is null" |
| | | + " first="+ db.readFirstDraftCN() |
| | | + " last=" + db.readLastDraftCN() |
| | | + " key=" + changeNumber + " value returned is null" |
| | | + " first="+ db.readFirstChangeNumber() |
| | | + " last=" + db.readLastChangeNumber() |
| | | + " count=" + db.count() |
| | | + " exception " + e + " " + e.getMessage()); |
| | | } |