mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
14.30.2013 0a51f5fbeb5e99c52f1be8973ae656de34fab75f
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -280,32 +280,13 @@
           *           unlock memory tree
           *           restart as usual
           *   load this change on the delayList
           *
           */
          SortedSet<ReplicationIterator> iteratorSortedSet =
              new TreeSet<ReplicationIterator>(
                  new ReplicationIteratorComparator());
          SortedSet<ReplicationIterator> iteratorSortedSet = null;
          try
          {
            /* fill the lateQueue */
            for (int serverId : replicationServerDomain.getServers())
            {
              ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
              ReplicationIterator iterator = replicationServerDomain
                  .getChangelogIterator(serverId, lastCsn);
              if (iterator != null)
              {
                if (iterator.getChange() != null)
                {
                  iteratorSortedSet.add(iterator);
                }
                else
                {
                  iterator.releaseCursor();
                }
              }
            }
            iteratorSortedSet = collectAllIteratorsWithChanges();
            /* fill the lateQueue */
            // The loop below relies on the fact that it is sorted based
            // on the currentChange of each iterator to consider the next
            // change across all servers.
@@ -320,22 +301,12 @@
              ReplicationIterator iterator = iteratorSortedSet.first();
              iteratorSortedSet.remove(iterator);
              lateQueue.add(iterator.getChange());
              if (iterator.next())
              {
                iteratorSortedSet.add(iterator);
              }
              else
              {
                iterator.releaseCursor();
              }
              addIteratorIfNotEmpty(iteratorSortedSet, iterator);
            }
          }
          finally
          {
            for (ReplicationIterator iterator : iteratorSortedSet)
            {
              iterator.releaseCursor();
            }
            releaseAllIterators(iteratorSortedSet);
          }
          /*
@@ -343,7 +314,6 @@
           * messages in the replication log so the remote serevr is not
           * late anymore.
           */
          if (lateQueue.isEmpty())
          {
            synchronized (msgQueue)
@@ -430,6 +400,19 @@
    return null;
  }
  private void addIteratorIfNotEmpty(SortedSet<ReplicationIterator> iterators,
      ReplicationIterator iter)
  {
    if (iter.next())
    {
      iterators.add(iter);
    }
    else
    {
      iter.releaseCursor();
    }
  }
  /**
   * Get the older Change Number for that server.
   * Returns null when the queue is empty.
@@ -450,7 +433,12 @@
      }
      else
      {
        if (lateQueue.isEmpty())
        if (!lateQueue.isEmpty())
        {
          UpdateMsg msg = lateQueue.first();
          result = msg.getChangeNumber();
        }
        else
        {
          /*
          following is false AND lateQueue is empty
@@ -460,36 +448,10 @@
          there. So let's take the last change not sent directly from
          the db.
          */
          SortedSet<ReplicationIterator> iteratorSortedSet =
              new TreeSet<ReplicationIterator>(
                  new ReplicationIteratorComparator());
          SortedSet<ReplicationIterator> iteratorSortedSet = null;
          try
          {
            // Build a list of candidates iterator (i.e. db i.e. server)
            for (int serverId : replicationServerDomain.getServers())
            {
              // get the last already sent CN from that server
              ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
              // get an iterator in this server db from that last change
              ReplicationIterator iterator =
                replicationServerDomain.getChangelogIterator(serverId, lastCsn);
              /*
              if that iterator has changes, then it is a candidate
              it is added in the sorted list at a position given by its
              current change (see ReplicationIteratorComparator).
              */
              if (iterator != null)
              {
                if (iterator.getChange() != null)
                {
                  iteratorSortedSet.add(iterator);
                }
                else
                {
                  iterator.releaseCursor();
                }
              }
            }
            iteratorSortedSet = collectAllIteratorsWithChanges();
            UpdateMsg msg = iteratorSortedSet.first().getChange();
            result = msg.getChangeNumber();
          } catch (Exception e)
@@ -497,21 +459,58 @@
            result = null;
          } finally
          {
            for (ReplicationIterator iterator : iteratorSortedSet)
            {
              iterator.releaseCursor();
            }
            releaseAllIterators(iteratorSortedSet);
          }
        } else
        {
          UpdateMsg msg = lateQueue.first();
          result = msg.getChangeNumber();
        }
      }
    }
    return result;
  }
  private SortedSet<ReplicationIterator> collectAllIteratorsWithChanges()
  {
    SortedSet<ReplicationIterator> results =
        new TreeSet<ReplicationIterator>(new ReplicationIteratorComparator());
    // Build a list of candidates iterator (i.e. db i.e. server)
    for (int serverId : replicationServerDomain.getServers())
    {
      // get the last already sent CN from that server
      ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
      // get an iterator in this server db from that last change
      ReplicationIterator iter =
        replicationServerDomain.getChangelogIterator(serverId, lastCsn);
      /*
      if that iterator has changes, then it is a candidate
      it is added in the sorted list at a position given by its
      current change (see ReplicationIteratorComparator).
      */
      if (iter != null)
      {
        if (iter.getChange() != null)
        {
          results.add(iter);
        }
        else
        {
          iter.releaseCursor();
        }
      }
    }
    return results;
  }
  private void releaseAllIterators(SortedSet<ReplicationIterator> iterators)
  {
    if (iterators != null)
    {
      for (ReplicationIterator iter : iterators)
      {
        iter.releaseCursor();
      }
    }
  }
  /**
   * Get the count of updates sent to this server.
   * @return  The count of update sent to this server.