mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
05.33.2011 246e4192d3967e638aad1f12adc3e36be2aa82e2
opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -350,46 +350,61 @@
            new ReplicationIteratorComparator();
          SortedSet<ReplicationIterator> iteratorSortedSet =
            new TreeSet<ReplicationIterator>(comparator);
          /* fill the lateQueue */
          for (int serverId : replicationServerDomain.getServers())
          try
          {
            ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
            ReplicationIterator iterator =
              replicationServerDomain.getChangelogIterator(serverId, lastCsn);
            if (iterator != null)
            /* fill the lateQueue */
            for (int serverId : replicationServerDomain.getServers())
            {
              if (iterator.getChange() != null)
              ChangeNumber lastCsn = serverState
                  .getMaxChangeNumber(serverId);
              ReplicationIterator iterator = replicationServerDomain
                  .getChangelogIterator(serverId, lastCsn);
              if (iterator != null)
              {
                if (iterator.getChange() != null)
                {
                  iteratorSortedSet.add(iterator);
                }
                else
                {
                  iterator.releaseCursor();
                }
              }
            }
            // The loop below relies on the fact that it is sorted based
            // on the currentChange of each iterator to consider the next
            // change across all servers.
            //
            // Hence it is necessary to remove and eventual add again an
            // iterator when looping in order to keep consistent the order of
            // the iterators (see ReplicationIteratorComparator.
            while (!iteratorSortedSet.isEmpty()
                && (lateQueue.count() < 100)
                && (lateQueue.bytesCount() < 50000))
            {
              ReplicationIterator iterator = iteratorSortedSet
                  .first();
              iteratorSortedSet.remove(iterator);
              lateQueue.add(iterator.getChange());
              if (iterator.next())
              {
                iteratorSortedSet.add(iterator);
              } else
              }
              else
              {
                iterator.releaseCursor();
              }
            }
          }
          // The loop below relies on the fact that it is sorted based
          // on the currentChange of each iterator to consider the next
          // change across all servers.
          // Hence it is necessary to remove and eventual add again an iterator
          // when looping in order to keep consistent the order of the
          // iterators (see ReplicationIteratorComparator.
          while (!iteratorSortedSet.isEmpty() &&
              (lateQueue.count()<100) &&
              (lateQueue.bytesCount()<50000) )
          finally
          {
            ReplicationIterator iterator = iteratorSortedSet.first();
            iteratorSortedSet.remove(iterator);
            lateQueue.add(iterator.getChange());
            if (iterator.next())
              iteratorSortedSet.add(iterator);
            else
            for (ReplicationIterator iterator : iteratorSortedSet)
            {
              iterator.releaseCursor();
            }
          }
          for (ReplicationIterator iterator : iteratorSortedSet)
          {
            iterator.releaseCursor();
          }
          /*
           * If the late queue is empty then we could not find any
           * messages in the replication log so the remote serevr is not
@@ -527,9 +542,16 @@
              // if that iterator has changes, then it is a candidate
              // it is added in the sorted list at a position given by its
              // current change (see ReplicationIteratorComparator).
              if ((iterator != null) && (iterator.getChange() != null))
              if (iterator != null)
              {
                iteratorSortedSet.add(iterator);
                if (iterator.getChange() != null)
                {
                  iteratorSortedSet.add(iterator);
                }
                else
                {
                  iterator.releaseCursor();
                }
              }
            }
            UpdateMsg msg = iteratorSortedSet.first().getChange();