opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -28,7 +28,6 @@ import org.opends.messages.MessageBuilder; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; @@ -36,7 +35,6 @@ import java.util.Date; import java.util.List; import java.util.LinkedList; import java.util.NoSuchElementException; import org.opends.server.admin.std.server.MonitorProviderCfg; import org.opends.server.api.DirectoryThread; @@ -47,7 +45,6 @@ import org.opends.server.types.InitializationException; import org.opends.server.util.TimeThread; import org.opends.server.core.DirectoryServer; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor; @@ -123,7 +120,6 @@ * */ private long trimage; private static final DebugTracer TRACER = getTracer(); /** * Creates a new dbHandler associated to a given LDAP server. @@ -206,8 +202,8 @@ /** * Get some changes out of the message queue of the LDAP server. * * @param number the number of messages to extract. * (from the beginning of the queue) * @param number the maximum number of messages to extract. * @return a List containing number changes extracted from the queue. */ private List<UpdateMsg> getChanges(int number) @@ -281,50 +277,17 @@ public ReplicationIterator generateIterator(ChangeNumber changeNumber) throws DatabaseException, Exception { /* * When we create an iterator we need to make sure that we * don't miss some changes because the iterator is created * close to the limit of the changed that have not yet been * flushed to the database. * We detect this by comparing the date of the changeNumber where * we want to start with the date of the first ChangeNumber * of the msgQueue. * If this is the case we flush the queue to the database. */ ChangeNumber recentChangeNumber = null; if (changeNumber == null) { flush(); } synchronized (msgQueue) { try { UpdateMsg msg = msgQueue.getLast(); recentChangeNumber = msg.getChangeNumber(); } catch (NoSuchElementException e) {} } if ( (recentChangeNumber != null) && (changeNumber != null)) { if (((recentChangeNumber.getTimeSec() - changeNumber.getTimeSec()) < 2) || ((recentChangeNumber.getSeqnum() - changeNumber.getSeqnum()) < 20)) { flush(); } } ReplicationIterator it = new ReplicationIterator(serverId, db, changeNumber); new ReplicationIterator(serverId, db, changeNumber, this); return it; } /** * Removes message in a subList of the msgQueue from the msgQueue. * Removes the provided number of messages from the beginning of the msgQueue. * * @param number the number of changes to be removed. */ @@ -335,7 +298,7 @@ int current = 0; while ((current < number) && (!msgQueue.isEmpty())) { UpdateMsg msg = msgQueue.remove(); UpdateMsg msg = msgQueue.remove(); // remove first queueByteSize -= msg.size(); current++; } @@ -508,8 +471,10 @@ /** * Flush a number of updates from the memory list to the stable storage. * Flush is done by chunk sized to 500 messages, starting from the * beginning of the list. */ private void flush() public void flush() { int size; int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize); @@ -518,7 +483,8 @@ { synchronized(flushLock) { // get N messages to save in the DB // get N (or less) messages from the queue to save to the DB // (from the beginning of the queue) List<UpdateMsg> changes = getChanges(chunksize); // if no more changes to save exit immediately. @@ -528,7 +494,8 @@ // save the change to the stable storage. db.addEntries(changes); // remove the changes from the list of changes to be saved. // remove the changes from the list of changes to be saved // (remove from the beginning of the queue) clearQueue(changes.size()); } } while (size >= chunksize); @@ -668,4 +635,14 @@ { return this.serverId; } /** * Return the size of the msgQueue (the memory cache of the DbHandler). * For test purpose. * @return The memory queue size. */ public int getQueueSize() { return this.msgQueue.size(); } } opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -40,6 +40,9 @@ { private UpdateMsg currentChange = null; private ReplServerDBCursor cursor = null; private DbHandler dbh; private ReplicationDB db; ChangeNumber lastNonNullCurrentCN; /** * Creates a new ReplicationIterator. @@ -49,18 +52,40 @@ * @param id the Identifier of the server on which the iterator applies. * @param db The db where the iterator must be created. * @param changeNumber The ChangeNumber after which the iterator must start. * @param dbh The associated DbHandler. * @throws Exception If there is no other change to push after change * with changeNumber number. * @throws DatabaseException if a database problem happened. */ public ReplicationIterator( short id, ReplicationDB db, ChangeNumber changeNumber) short id, ReplicationDB db, ChangeNumber changeNumber, DbHandler dbh) throws Exception, DatabaseException { cursor = db.openReadCursor(changeNumber); this.db = db; this.dbh = dbh; this.lastNonNullCurrentCN = changeNumber; try { cursor = db.openReadCursor(changeNumber); } catch(Exception e) { // we didn't find it in the db cursor = null; } if (cursor == null) { throw new Exception("no new change"); // flush the queue into the db dbh.flush(); // look again in the db cursor = db.openReadCursor(changeNumber); if (cursor == null) { throw new Exception("no new change"); } } } @@ -80,18 +105,47 @@ */ public boolean next() { currentChange = cursor.next(); boolean hasNext = false; currentChange = cursor.next(); // can return null if (currentChange != null) return true; { lastNonNullCurrentCN = currentChange.getChangeNumber(); hasNext = true; } else { // TODO : should check here if some changes are still in the // dbHandler message queue and not yet saved to the backing database // if yes should get change from there from now on. return false; synchronized (this) { if (cursor != null) { cursor.close(); cursor = null; } dbh.flush(); try { cursor = db.openReadCursor(lastNonNullCurrentCN); currentChange = cursor.next(); // can return null lastNonNullCurrentCN = currentChange.getChangeNumber(); if (currentChange != null) { hasNext = true; } else { hasNext = false; } } catch(Exception e) { currentChange = null; hasNext = false; } } } return hasNext; } /** @@ -108,6 +162,8 @@ cursor.close(); cursor = null; } this.dbh = null; this.db = null; } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.server; @@ -50,6 +50,7 @@ ReplicationServer replicationServer = null; ReplicationDbEnv dbEnv = null; DbHandler handler = null; ReplicationIterator it = null; try { TestCaseUtils.startServer(); @@ -85,22 +86,145 @@ ChangeNumber changeNumber1 = gen.newChangeNumber(); ChangeNumber changeNumber2 = gen.newChangeNumber(); ChangeNumber changeNumber3 = gen.newChangeNumber(); ChangeNumber changeNumber4 = gen.newChangeNumber(); ChangeNumber changeNumber5 = gen.newChangeNumber(); DeleteMsg update1 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1, "uid"); DeleteMsg update2 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2, "uid"); DeleteMsg update3 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3, "uid"); "uid"); DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber4, "uid"); handler.add(update1); handler.add(update2); handler.add(update3); // The ChangeNumber should not get purged //-- // Iterator tests with memory queue only populated // verify that memory queue is populated assertEquals(handler.getQueueSize(),3); // Iterator from existing CN it = handler.generateIterator(changeNumber1); assertTrue(it.next()); assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0, " Actual change number=" + it.getChange().getChangeNumber() + " Expect change number=" + changeNumber2); assertTrue(it.next()); assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0, " Actual change number=" + it.getChange().getChangeNumber()); assertFalse(it.next()); it.releaseCursor(); it=null; // Iterator from NON existing CN Exception ec = null; try { it = handler.generateIterator(changeNumber5); } catch(Exception e) { ec = e; } assertNotNull(ec); assert(ec.getLocalizedMessage().equals("ChangeNumber not available")); //-- // Iterator tests with db only populated Thread.sleep(1000); // let the time for flush to happen // verify that memory queue is empty (all changes flushed in the db) assertEquals(handler.getQueueSize(),0); // Test iterator from existing CN it = handler.generateIterator(changeNumber1); assertTrue(it.next()); assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0, " Actual change number=" + it.getChange().getChangeNumber()); assertTrue(it.next()); assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0, " Actual change number=" + it.getChange().getChangeNumber()); assertFalse(it.next()); it.releaseCursor(); it=null; // Iterator from NON existing CN ec = null; try { it = handler.generateIterator(changeNumber5); } catch(Exception e) { ec = e; } assertNotNull(ec); assert(ec.getLocalizedMessage().equals("ChangeNumber not available")); // Test first and last assertEquals(changeNumber1, handler.getFirstChange()); assertEquals(changeNumber3, handler.getLastChange()); //-- // Iterator tests with db and memory queue populated // all changes in the db - add one in the memory queue handler.add(update4); // verify memory queue contains this one assertEquals(handler.getQueueSize(),1); // Test iterator from existing CN it = handler.generateIterator(changeNumber1); assertTrue(it.next()); assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0, " Actual change number=" + it.getChange().getChangeNumber()); assertTrue(it.next()); assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0, " Actual change number=" + it.getChange().getChangeNumber()); assertTrue(it.next()); assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber4)==0, " Actual change number=" + it.getChange().getChangeNumber()); assertFalse(it.next()); assertTrue(it.getChange()==null); it.releaseCursor(); it=null; // Test iterator from existing CN at the limit between queue and db it = handler.generateIterator(changeNumber3); assertTrue(it.next()); assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber4)==0, " Actual change number=" + it.getChange().getChangeNumber()); assertFalse(it.next()); assertTrue(it.getChange()==null); it.releaseCursor(); it=null; // Test iterator from existing CN at the limit between queue and db it = handler.generateIterator(changeNumber4); assertFalse(it.next()); assertTrue(it.getChange()==null, " Actual change number=" + it.getChange()); it.releaseCursor(); it=null; // Test iterator from NON existing CN ec = null; try { it = handler.generateIterator(changeNumber5); } catch(Exception e) { ec = e; } assertNotNull(ec); assert(ec.getLocalizedMessage().equals("ChangeNumber not available")); handler.setPurgeDelay(1); boolean purged = false; @@ -109,8 +233,8 @@ { ChangeNumber firstChange = handler.getFirstChange(); ChangeNumber lastChange = handler.getLastChange(); if ((!firstChange.equals(changeNumber3) || (!lastChange.equals(changeNumber3)))) if ((!firstChange.equals(changeNumber4) || (!lastChange.equals(changeNumber4)))) { TestCaseUtils.sleep(100); } else @@ -120,6 +244,11 @@ } } finally { if (it != null) { it.releaseCursor(); it=null; } if (handler != null) handler.shutdown(); if (dbEnv != null)