mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
17.51.2013 0b8de8e6dbc4760fa587d12dd868c7c9f577b261
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -115,8 +115,6 @@
  private int serverId;
  private String baseDn;
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private boolean shutdown = false;
  private boolean done = false;
  private DirectoryThread thread;
  private final Object flushLock = new Object();
  private ReplicationServer replicationServer;
@@ -310,30 +308,18 @@
   */
  public void shutdown()
  {
    if (shutdown)
    if (thread.isShutdownInitiated())
    {
      return;
    }
    shutdown  = true;
    thread.initiateShutdown();
    synchronized (msgQueue)
    {
      msgQueue.notifyAll();
    }
    synchronized (this)
    { /* Can this be replaced with thread.join() ? */
      while (!done)
      {
        try
        {
          wait();
        }
        catch (InterruptedException e)
        { /* do nothing */}
      }
    }
    while (msgQueue.size() != 0)
    {
      flush();
@@ -351,55 +337,62 @@
  @Override
  public void run()
  {
    while (!shutdown)
    {
      try
      {
        flush();
        trim();
    thread.startWork();
        synchronized (msgQueue)
    try
    {
      while (!thread.isShutdownInitiated())
      {
        try
        {
          if (msgQueue.size() < queueLowmark
              && queueByteSize < queueLowmarkBytes)
          flush();
          trim();
          synchronized (msgQueue)
          {
            try
            if (msgQueue.size() < queueLowmark
                && queueByteSize < queueLowmarkBytes)
            {
              msgQueue.wait(1000);
            } catch (InterruptedException e)
            {
              Thread.currentThread().interrupt();
              try
              {
                msgQueue.wait(1000);
              }
              catch (InterruptedException e)
              {
                Thread.currentThread().interrupt();
              }
            }
          }
        }
      } catch (Exception end)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
        mb.append(" ");
        mb.append(stackTraceToSingleLineString(end));
        logError(mb.toMessage());
        synchronized (this)
        catch (Exception end)
        {
          // set the done variable to true so that this thread don't
          // get stuck in this dbHandler.shutdown() when it get called
          // by replicationServer.shutdown();
          done = true;
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
          mb.append(" ");
          mb.append(stackTraceToSingleLineString(end));
          logError(mb.toMessage());
          thread.initiateShutdown();
          if (replicationServer != null)
          {
            replicationServer.shutdown();
          }
          break;
        }
        if (replicationServer != null)
        {
          replicationServer.shutdown();
        }
        break;
      }
      // call flush a last time before exiting to make sure that
      // no change was forgotten in the msgQueue
      flush();
    }
    // call flush a last time before exiting to make sure that
    // no change was forgotten in the msgQueue
    flush();
    finally
    {
      thread.stopWork();
    }
    synchronized (this)
    {
      done = true;
      notifyAll();
    }
  }
@@ -450,11 +443,14 @@
        {
          for (int j = 0; j < 50; j++)
          {
            if (thread.isShutdownInitiated())
            {
              return;
            }
            CSN csn = cursor.nextCSN();
            if (csn == null)
            {
              cursor.close();
              done = true;
              return;
            }
@@ -465,21 +461,22 @@
            else
            {
              firstChange = csn;
              cursor.close();
              done = true;
              return;
            }
          }
          cursor.close();
        }
        catch (ChangelogException e)
        {
          // mark shutdown for this db so that we don't try again to
          // stop it from cursor.close() or methods called by cursor.close()
          cursor.abort();
          shutdown = true;
          thread.initiateShutdown();
          throw e;
        }
        finally
        {
          cursor.close();
        }
      }
    }
  }