mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
17.30.2013 407101fb21106bb8697aa771826638a41b968f3a
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -31,7 +31,6 @@
import java.util.LinkedList;
import java.util.List;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
@@ -270,7 +269,7 @@
   * @return a new {@link DBCursor} that allows to browse the db managed by this
   *         ReplicaDB and starting at the position defined by a given CSN.
   * @throws ChangelogException
   *           if a database problem happened.
   *           if a database problem happened
   */
  public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN)
      throws ChangelogException
@@ -325,7 +324,16 @@
    while (msgQueue.size() != 0)
    {
      flush();
      try
      {
        flush();
      }
      catch (ChangelogException e)
      {
        // We are already shutting down
        logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
            .get(stackTraceToSingleLineString(e)));
      }
    }
    db.shutdown();
@@ -372,25 +380,21 @@
        }
        catch (Exception end)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
          mb.append(" ");
          mb.append(stackTraceToSingleLineString(end));
          logError(mb.toMessage());
          thread.initiateShutdown();
          if (replicationServer != null)
          {
            replicationServer.shutdown();
          }
          stop(end);
          break;
        }
      }
      // call flush a last time before exiting to make sure that
      // no change was forgotten in the msgQueue
      flush();
      try
      {
        // call flush a last time before exiting to make sure that
        // no change was forgotten in the msgQueue
        flush();
      }
      catch (ChangelogException e)
      {
        stop(e);
      }
    }
    finally
    {
@@ -403,6 +407,19 @@
    }
  }
  private void stop(Exception e)
  {
    logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
        .get(stackTraceToSingleLineString(e)));
    thread.initiateShutdown();
    if (replicationServer != null)
    {
      replicationServer.shutdown();
    }
  }
  /**
   * Retrieves the latest trim date.
   * @return the latest trim date.
@@ -489,10 +506,14 @@
  /**
   * Flush a number of updates from the memory list to the stable storage.
   * Flush is done by chunk sized to 500 messages, starting from the
   * beginning of the list.
   * <p>
   * Flush is done by chunk sized to 500 messages, starting from the beginning
   * of the list.
   *
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void flush()
  public void flush() throws ChangelogException
  {
    int size;
    int chunksize = Math.min(queueMaxSize, 500);