| | |
| | | import org.opends.messages.MessageBuilder; |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | |
| | | import java.util.Date; |
| | | import java.util.List; |
| | | import java.util.LinkedList; |
| | | import java.util.NoSuchElementException; |
| | | |
| | | import org.opends.server.admin.std.server.MonitorProviderCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | |
| | | import org.opends.server.types.InitializationException; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor; |
| | |
| | | * |
| | | */ |
| | | private long trimage; |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | | * Creates a new dbHandler associated to a given LDAP server. |
| | |
| | | |
| | | /** |
| | | * Get some changes out of the message queue of the LDAP server. |
| | | * |
| | | * @param number the number of messages to extract. |
| | | * (from the beginning of the queue) |
| | | * @param number the maximum number of messages to extract. |
| | | * @return a List containing number changes extracted from the queue. |
| | | */ |
| | | private List<UpdateMsg> getChanges(int number) |
| | |
| | | public ReplicationIterator generateIterator(ChangeNumber changeNumber) |
| | | throws DatabaseException, Exception |
| | | { |
| | | /* |
| | | * When we create an iterator we need to make sure that we |
| | | * don't miss some changes because the iterator is created |
| | | * close to the limit of the changed that have not yet been |
| | | * flushed to the database. |
| | | * We detect this by comparing the date of the changeNumber where |
| | | * we want to start with the date of the first ChangeNumber |
| | | * of the msgQueue. |
| | | * If this is the case we flush the queue to the database. |
| | | */ |
| | | ChangeNumber recentChangeNumber = null; |
| | | |
| | | if (changeNumber == null) |
| | | { |
| | | flush(); |
| | | } |
| | | synchronized (msgQueue) |
| | | { |
| | | try |
| | | { |
| | | UpdateMsg msg = msgQueue.getLast(); |
| | | recentChangeNumber = msg.getChangeNumber(); |
| | | } |
| | | catch (NoSuchElementException e) |
| | | {} |
| | | } |
| | | |
| | | if ( (recentChangeNumber != null) && (changeNumber != null)) |
| | | { |
| | | if (((recentChangeNumber.getTimeSec() - changeNumber.getTimeSec()) < 2) || |
| | | ((recentChangeNumber.getSeqnum() - changeNumber.getSeqnum()) < 20)) |
| | | { |
| | | flush(); |
| | | } |
| | | } |
| | | |
| | | ReplicationIterator it = |
| | | new ReplicationIterator(serverId, db, changeNumber); |
| | | |
| | | new ReplicationIterator(serverId, db, changeNumber, this); |
| | | return it; |
| | | } |
| | | |
| | | /** |
| | | * Removes message in a subList of the msgQueue from the msgQueue. |
| | | * Removes the provided number of messages from the beginning of the msgQueue. |
| | | * |
| | | * @param number the number of changes to be removed. |
| | | */ |
| | |
| | | int current = 0; |
| | | while ((current < number) && (!msgQueue.isEmpty())) |
| | | { |
| | | UpdateMsg msg = msgQueue.remove(); |
| | | UpdateMsg msg = msgQueue.remove(); // remove first |
| | | queueByteSize -= msg.size(); |
| | | current++; |
| | | } |
| | |
| | | |
| | | /** |
| | | * Flush a number of updates from the memory list to the stable storage. |
| | | * Flush is done by chunk sized to 500 messages, starting from the |
| | | * beginning of the list. |
| | | */ |
| | | private void flush() |
| | | public void flush() |
| | | { |
| | | int size; |
| | | int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize); |
| | |
| | | { |
| | | synchronized(flushLock) |
| | | { |
| | | // get N messages to save in the DB |
| | | // get N (or less) messages from the queue to save to the DB |
| | | // (from the beginning of the queue) |
| | | List<UpdateMsg> changes = getChanges(chunksize); |
| | | |
| | | // if no more changes to save exit immediately. |
| | |
| | | // save the change to the stable storage. |
| | | db.addEntries(changes); |
| | | |
| | | // remove the changes from the list of changes to be saved. |
| | | // remove the changes from the list of changes to be saved |
| | | // (remove from the beginning of the queue) |
| | | clearQueue(changes.size()); |
| | | } |
| | | } while (size >= chunksize); |
| | |
| | | { |
| | | return this.serverId; |
| | | } |
| | | |
| | | /** |
| | | * Return the size of the msgQueue (the memory cache of the DbHandler). |
| | | * For test purpose. |
| | | * @return The memory queue size. |
| | | */ |
| | | public int getQueueSize() |
| | | { |
| | | return this.msgQueue.size(); |
| | | } |
| | | } |