opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -297,4 +297,13 @@ { return parentUniqueId; } /** * {@inheritDoc} */ @Override public int size() { return encodedAttributes.length + 40; } } opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
@@ -122,4 +122,15 @@ { return ("DEL " + getDn() + " " + getChangeNumber()); } /** * {@inheritDoc} */ @Override public int size() { // The DeleteMsg size is mostly dependent on the DN and should never // grow very large. It is therefore safe to assume an average of 40 bytes. return 40; } } opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -360,4 +360,15 @@ } } /** * {@inheritDoc} */ @Override public int size() { // The MODDN message size are mainly dependent on the // size of the DN. let's assume that they average on 100 bytes return 100; } } opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -235,4 +235,16 @@ { return("MOD " + getDn() + " " + getChangeNumber()); } /** * {@inheritDoc} */ @Override public int size() { // The ModifyMsh can be very large when added or deleted attribute // values are very large. We therefore need to count the // whole encoded msg. return encodedMsg.length; } } opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java
@@ -362,4 +362,11 @@ } } /** * Return the number of bytes used by this message. * * @return The number of bytes used by this message. */ public abstract int size(); } opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -81,6 +81,22 @@ // private final LinkedList<UpdateMessage> msgQueue = new LinkedList<UpdateMessage>(); // The High and low water mark for the max size of the msgQueue. // the threads calling add() method will be blocked if the size of // msgQueue becomes larger than the queueHimark and will resume // only when the size of the msgQueue goes below queueLowmark. int queueHimark = 5000; int queueLowmark = 4000; // The queue himark and lowmark in bytes, this is set to 100 times the // himark and lowmark in number of updates. int queueHimarkBytes = 100 * queueHimark; int queueLowmarkBytes = 100 * queueLowmark; // The number of bytes currently in the queue int queueByteSize = 0; private ReplicationDB db; private ChangeNumber firstChange = null; private ChangeNumber lastChange = null; @@ -93,14 +109,6 @@ private final Object flushLock = new Object(); private ReplicationServer replicationServer; // The High and low water mark for the max size of the msgQueue. // the threads calling add() method will be blocked if the size of // msgQueue becomes larger than the MSG_QUEUE_HIMARK and will resume // only when the size of the msgQueue goes below MSG_QUEUE_LOWMARK. final static int MSG_QUEUE_HIMARK = 5000; final static int MSG_QUEUE_LOWMARK = 4000; // The maximum number of retries in case of DatabaseDeadlock Exception. private static final int DEADLOCK_RETRIES = 10; @@ -120,16 +128,22 @@ * @param replicationServer The ReplicationServer that creates this dbHandler. * @param dbenv the Database Env to use to create the ReplicationServer DB. * server for this domain. * @param queueSize The queueSize to use when creating the dbHandler. * @throws DatabaseException If a database problem happened */ public DbHandler(short id, DN baseDn, ReplicationServer replicationServer, ReplicationDbEnv dbenv) public DbHandler( short id, DN baseDn, ReplicationServer replicationServer, ReplicationDbEnv dbenv, int queueSize) throws DatabaseException { this.replicationServer = replicationServer; this.serverId = id; serverId = id; this.baseDn = baseDn; this.trimage = replicationServer.getTrimage(); trimage = replicationServer.getTrimage(); queueHimark = queueSize; queueLowmark = queueSize * 4 / 5; queueHimarkBytes = 100 * queueHimark; queueLowmarkBytes = 100 * queueLowmark; db = new ReplicationDB(id, baseDn, replicationServer, dbenv); firstChange = db.readFirstChange(); lastChange = db.readLastChange(); @@ -156,7 +170,7 @@ synchronized (msgQueue) { int size = msgQueue.size(); while (size > MSG_QUEUE_HIMARK) while ((size > queueHimark) || (queueByteSize > queueHimarkBytes)) { try { @@ -168,6 +182,7 @@ size = msgQueue.size(); } queueByteSize += update.size(); msgQueue.add(update); if (lastChange == null || lastChange.older(update.getChangeNumber())) { @@ -308,10 +323,12 @@ int current = 0; while ((current < number) && (!msgQueue.isEmpty())) { msgQueue.remove(); UpdateMessage msg = msgQueue.remove(); queueByteSize -= msg.size(); current++; } if (msgQueue.size() < MSG_QUEUE_LOWMARK) if ((msgQueue.size() < queueLowmark) && (queueByteSize < queueLowmarkBytes)) msgQueue.notify(); } } @@ -475,13 +492,14 @@ private void flush() { int size; int chunksize = (500 < queueHimark ? 500 : queueHimark); do { synchronized(flushLock) { // get N messages to save in the DB List<UpdateMessage> changes = getChanges(500); List<UpdateMessage> changes = getChanges(chunksize); // if no more changes to save exit immediately. if ((changes == null) || ((size = changes.size()) == 0)) @@ -493,7 +511,7 @@ // remove the changes from the list of changes to be saved. clearQueue(changes.size()); } } while (size >=500); } while (size >= chunksize); } /** @@ -529,6 +547,10 @@ attributes.add(new Attribute("last-change", lastChange.toString() + " " + lastTime.toString())); } attributes.add( new Attribute("queue-size", String.valueOf(msgQueue.size()))); attributes.add( new Attribute("queue-size-bytes", String.valueOf(queueByteSize))); return attributes; } @@ -604,6 +626,7 @@ synchronized(flushLock) { msgQueue.clear(); queueByteSize = 0; } db.clear(); firstChange = db.readFirstChange(); opends/src/server/org/opends/server/replication/server/MsgQueue.java
@@ -42,6 +42,9 @@ private SortedMap<ChangeNumber, UpdateMessage> map = new TreeMap<ChangeNumber, UpdateMessage>(); // The total number of bytes for all the message in the queue. private int bytesCount = 0; /** * Return the first UpdateMessage in the MsgQueue. * @@ -67,12 +70,22 @@ * * @return The number of elements in this MsgQueue. */ public int size() public int count() { return map.size(); } /** * Returns the number of bytes in this MsgQueue. * * @return The number of bytes in this MsgQueue. */ public int bytesCount() { return bytesCount; } /** * Returns <tt>true</tt> if this MsgQueue contains no UpdateMessage. * * @return <tt>true</tt> if this MsgQueue contains no UpdateMessage. @@ -91,6 +104,7 @@ public void add(UpdateMessage update) { map.put(update.getChangeNumber(), update); bytesCount += update.size(); } /** @@ -102,6 +116,7 @@ { UpdateMessage msg = map.get(map.firstKey()); map.remove(msg.getChangeNumber()); bytesCount -= msg.size(); return msg; } @@ -126,5 +141,6 @@ public void clear() { map.clear(); bytesCount = 0; } } opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -257,7 +257,9 @@ + " serverId=" + serverId); DbHandler dbHandler = new DbHandler(serverId, baseDn, replicationServer, this); new DbHandler( serverId, baseDn, replicationServer, this, replicationServer.getQueueSize()); replicationServer.getReplicationServerDomain(baseDn, true). setDbHandler(serverId, dbHandler); opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -134,7 +134,7 @@ // ID of the backend private static final String backendId = "replicationChanges"; // At startup, the listen thread wait on this flag for the connet // At startup, the listen thread wait on this flag for the connect // thread to look for other servers in the topology. private boolean connectedInTopology = false; private final Object connectedInTopologyLock = new Object(); @@ -554,7 +554,7 @@ public DbHandler newDbHandler(short id, DN baseDn) throws DatabaseException { return new DbHandler(id, baseDn, this, dbEnv); return new DbHandler(id, baseDn, this, dbEnv, queueSize); } /** @@ -827,6 +827,17 @@ } /** * Get the queueSize for this replication server. * * @return The maximum size of the queues for this Replication Server * */ public int getQueueSize() { return queueSize; } /** * Creates the backend associated to this replication server. * @throws ConfigException */ opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -96,7 +96,8 @@ private int maxSendQueue = 0; private int maxReceiveDelay = 0; private int maxSendDelay = 0; private int maxQueueSize = 10000; private int maxQueueSize = 5000; private int maxQueueBytesSize = maxQueueSize * 100; private int restartReceiveQueue; private int restartSendQueue; private int restartReceiveDelay; @@ -115,8 +116,8 @@ private Semaphore sendWindow; private int sendWindowSize; private boolean flowControl = false; // indicate that the server is // flow controled and should // be stopped from sending messsages. // flow controlled and should // be stopped from sending messages. private int saturationCount = 0; private short replicationServerId; @@ -165,6 +166,7 @@ super("Server Handler"); this.session = session; this.maxQueueSize = queueSize; this.maxQueueBytesSize = queueSize * 100; this.protocolVersion = ProtocolVersion.currentVersion(); } @@ -305,7 +307,7 @@ session.publish(myStartMsg); sendWindowSize = receivedMsg.getWindowSize(); /* Until here session is encrypted then it depends on the negociation */ /* Until here session is encrypted then it depends on the negotiation */ if (!sslEncryption) { session.stopEncryption(); @@ -352,7 +354,7 @@ } else { // We are an empty Replicationserver // We are an empty Replication Server if ((generationId>0)&&(!serverState.isEmpty())) { // If the LDAP server has already sent changes @@ -722,7 +724,7 @@ { synchronized (msgQueue) { int size = msgQueue.size(); int size = msgQueue.count(); if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue)) return true; @@ -764,7 +766,7 @@ { synchronized (msgQueue) { int queueSize = msgQueue.size(); int queueSize = msgQueue.count(); if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue)) return false; if ((source != null) && (source.maxSendQueue > 0) && @@ -815,7 +817,7 @@ * the number of updates to be sent is the size of the receive queue. */ if (isFollowing()) return msgQueue.size(); return msgQueue.count(); else { /* @@ -1034,7 +1036,8 @@ /* TODO : size should be configurable * and larger than max-receive-queue-size */ while (msgQueue.size() > maxQueueSize) while ((msgQueue.count() > maxQueueSize) || (msgQueue.bytesCount() > maxQueueBytesSize)) { setFollowing(false); msgQueue.removeFirst(); @@ -1164,11 +1167,13 @@ // The loop below relies on the fact that it is sorted based // on the currentChange of each iterator to consider the next // change accross all servers. // change across all servers. // Hence it is necessary to remove and eventual add again an iterator // when looping in order to keep consistent the order of the // iterators (see ReplicationIteratorComparator. while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100)) while (!iteratorSortedSet.isEmpty() && (lateQueue.count()<100) && (lateQueue.bytesCount()<50000) ) { ReplicationIterator iterator = iteratorSortedSet.first(); iteratorSortedSet.remove(iterator); @@ -1190,7 +1195,8 @@ { synchronized (msgQueue) { if (msgQueue.size() < maxQueueSize) if ((msgQueue.count() < maxQueueSize) && (msgQueue.bytesCount() < maxQueueBytesSize)) { setFollowing(true); } @@ -1203,7 +1209,7 @@ { if (msgQueue.contains(msg)) { /* we finally catched up with the regular queue */ /* we finally catch up with the regular queue */ setFollowing(true); lateQueue.clear(); UpdateMessage msg1; @@ -1266,7 +1272,7 @@ * Update the serverState with the last message sent. * * @param msg the last update sent. * @return boolean indicating if the update was meaningfull. * @return boolean indicating if the update was meaningful. */ public boolean updateServerState(UpdateMessage msg) { @@ -1599,6 +1605,15 @@ } } attributes.add( new Attribute("queue-size", String.valueOf(msgQueue.count()))); attributes.add( new Attribute( "queue-size-bytes", String.valueOf(msgQueue.bytesCount()))); attributes.add( new Attribute( "following", String.valueOf(following))); // Deprecated attributes.add(new Attribute("max-waiting-changes", String.valueOf(maxQueueSize))); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -73,7 +73,8 @@ ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer); DbHandler handler = new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv); new DbHandler( (short) 1, DN.decode("o=test"), replicationServer, dbEnv, 5000); ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0); ChangeNumber changeNumber1 = gen.newChangeNumber(); @@ -153,7 +154,8 @@ ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer); DbHandler handler = new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv); new DbHandler( (short) 1, DN.decode("o=test"), replicationServer, dbEnv, 5000); // Creates changes added to the dbHandler ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);