| | |
| | | * Portions copyright 2011-2013 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | import org.opends.messages.MessageBuilder; |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.Date; |
| | | import java.util.List; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.admin.std.server.MonitorProviderCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.InitializationException; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.InitializationException; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | |
| | |
| | | * It is responsible for efficiently saving the updates that is received from |
| | | * each master server into stable storage. |
| | | * This class is also able to generate a ReplicationIterator that can be |
| | | * used to read all changes from a given ChangeNUmber. |
| | | * used to read all changes from a given ChangeNumber. |
| | | * |
| | | * This class publish some monitoring information below cn=monitor. |
| | | * |
| | | */ |
| | | public class DbHandler implements Runnable |
| | | { |
| | | // The msgQueue holds all the updates not yet saved to stable storage. |
| | | // This list is only used as a temporary placeholder so that the write |
| | | // in the stable storage can be grouped for efficiency reason. |
| | | // Adding an update synchronously add the update to this list. |
| | | // A dedicated thread loops on flush() and trim(). |
| | | // flush() : get a number of changes from the in memory list by block |
| | | // and write them to the db. |
| | | // trim() : deletes from the DB a number of changes that are older than a |
| | | // certain date. |
| | | // |
| | | // Changes are not read back by replicationServer threads that are responsible |
| | | // for pushing the changes to other replication server or to LDAP server |
| | | // |
| | | /** |
| | | * The msgQueue holds all the updates not yet saved to stable storage. |
| | | * <p> |
| | | * This list is only used as a temporary placeholder so that the write in the |
| | | * stable storage can be grouped for efficiency reason. Adding an update |
| | | * synchronously add the update to this list. A dedicated thread loops on |
| | | * flush() and trim(). |
| | | * <dl> |
| | | * <dt>flush()</dt> |
| | | * <dd>get a number of changes from the in memory list by block and write them |
| | | * to the db.</dd> |
| | | * <dt>trim()</dt> |
| | | * <dd>deletes from the DB a number of changes that are older than a certain |
| | | * date.</dd> |
| | | * </dl> |
| | | * <p> |
| | | * Changes are not read back by replicationServer threads that are responsible |
| | | * for pushing the changes to other replication server or to LDAP server |
| | | */ |
| | | private final LinkedList<UpdateMsg> msgQueue = |
| | | new LinkedList<UpdateMsg>(); |
| | | |
| | | // The High and low water mark for the max size of the msgQueue. |
| | | // the threads calling add() method will be blocked if the size of |
| | | // msgQueue becomes larger than the queueHimark and will resume |
| | | // only when the size of the msgQueue goes below queueLowmark. |
| | | int queueMaxSize = 5000; |
| | | int queueLowmark = 1000; |
| | | int queueHimark = 4000; |
| | | /** |
| | | * The High and low water mark for the max size of the msgQueue. The threads |
| | | * calling add() method will be blocked if the size of msgQueue becomes larger |
| | | * than the queueHimark and will resume only when the size of the msgQueue |
| | | * goes below queueLowmark. |
| | | */ |
| | | private int queueMaxSize = 5000; |
| | | private int queueLowmark = 1000; |
| | | private int queueHimark = 4000; |
| | | |
| | | // The queue himark and lowmark in bytes, this is set to 100 times the |
| | | // himark and lowmark in number of updates. |
| | | int queueMaxBytes = 100 * queueMaxSize; |
| | | int queueLowmarkBytes = 100 * queueLowmark; |
| | | int queueHimarkBytes = 100 * queueHimark; |
| | | /** |
| | | * The queue himark and lowmark in bytes, this is set to 100 times the himark |
| | | * and lowmark in number of updates. |
| | | */ |
| | | private int queueMaxBytes = 100 * queueMaxSize; |
| | | private int queueLowmarkBytes = 100 * queueLowmark; |
| | | private int queueHimarkBytes = 100 * queueHimark; |
| | | |
| | | // The number of bytes currently in the queue |
| | | int queueByteSize = 0; |
| | | /** The number of bytes currently in the queue */ |
| | | private int queueByteSize = 0; |
| | | |
| | | private ReplicationDB db; |
| | | private ChangeNumber firstChange = null; |
| | |
| | | private long latestTrimDate = 0; |
| | | |
| | | /** |
| | | * |
| | | * The trim age in milliseconds. Changes record in the change DB that |
| | | * are older than this age are removed. |
| | | * |
| | | */ |
| | | private long trimAge; |
| | | |
| | |
| | | lastChange = update.getChangeNumber(); |
| | | } |
| | | if (firstChange == null) |
| | | { |
| | | firstChange = update.getChangeNumber(); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | queueByteSize -= msg.size(); |
| | | current++; |
| | | } |
| | | if ((msgQueue.size() < queueLowmark) && |
| | | (queueByteSize < queueLowmarkBytes)) |
| | | if ((msgQueue.size() < queueLowmark) |
| | | && (queueByteSize < queueLowmarkBytes)) |
| | | { |
| | | msgQueue.notifyAll(); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | while (msgQueue.size() != 0) |
| | | { |
| | | flush(); |
| | | } |
| | | |
| | | db.shutdown(); |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | |
| | | * Periodically Flushes the ReplicationServerDomain cache from memory to the |
| | | * stable storage and trims the old updates. |
| | | */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | while (!shutdown) |
| | |
| | | */ |
| | | public int getCount(ChangeNumber from, ChangeNumber to) |
| | | { |
| | | int c=0; |
| | | // Now that we always keep the last ChangeNumber in the DB to avoid |
| | | // expiring cookies to quickly, we need to check if the "to" |
| | | // expiring cookies too quickly, we need to check if the "to" |
| | | // is older than the trim date. |
| | | if ((to == null) || !to.older(new ChangeNumber(latestTrimDate, 0, 0))) |
| | | { |
| | | flush(); |
| | | c = db.count(from, to); |
| | | return db.count(from, to); |
| | | } |
| | | return c; |
| | | return 0; |
| | | } |
| | | |
| | | } |