mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

abobrov
09.52.2009 54cb24af9165680e648071929bbbd271b9af1e4e
- EXPERIMENTAL Parallel Text Writer implementation.
1 files added
2 files modified
304 ■■■■■ changed files
opends/src/admin/defn/org/opends/server/admin/std/FileBasedAccessLogPublisherConfiguration.xml 10 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/loggers/ParallelTextWriter.java 257 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/loggers/TextAccessLogPublisher.java 37 ●●●●● patch | view | raw | blame | history
opends/src/admin/defn/org/opends/server/admin/std/FileBasedAccessLogPublisherConfiguration.xml
@@ -23,7 +23,7 @@
  ! CDDL HEADER END
  !
  !
  !      Copyright 2007-2008 Sun Microsystems, Inc.
  !      Copyright 2007-2009 Sun Microsystems, Inc.
  ! -->
<adm:managed-object name="file-based-access-log-publisher"
  plural-name="file-based-access-log-publishers"
@@ -74,6 +74,12 @@
      The maximum number of log records that can be stored in the
      asynchronous queue.
    </adm:synopsis>
    <adm:description>
      Setting the queue size to zero activates parallel log writer
      implementation which has no queue size limit and as such the
      parallel log writer should only be used on a very well tuned
      server configuration to avoid potential out of memory errors.
    </adm:description>
    <adm:requires-admin-action>
      <adm:other>
        <adm:synopsis>
@@ -88,7 +94,7 @@
      </adm:defined>
    </adm:default-behavior>
    <adm:syntax>
      <adm:integer lower-limit="1" />
      <adm:integer lower-limit="0" />
    </adm:syntax>
    <adm:profile name="ldap">
      <ldap:attribute>
opends/src/server/org/opends/server/loggers/ParallelTextWriter.java
New file
@@ -0,0 +1,257 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.loggers;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.ServerShutdownListener;
import org.opends.server.core.DirectoryServer;
/**
 * A Text Writer which writes log records asynchronously to
 * character-based stream. Note that this implementation is
 * parallel unbound ie there is no queue size cap imposed.
 */
public class ParallelTextWriter
    implements ServerShutdownListener, TextWriter
{
  /**
   * The wrapped Text Writer.
   */
  private final TextWriter writer;
  /** Queue to store unpublished records. */
  private final ConcurrentLinkedQueue<String> queue;
  private final Semaphore queueSemaphore = new Semaphore(0, false);
  private String name;
  private AtomicBoolean stopRequested;
  private WriterThread writerThread;
  private boolean autoFlush;
  /**
   * Construct a new ParallelTextWriter wrapper.
   *
   * @param name      the name of the thread.
   * @param autoFlush indicates if the underlying writer should be flushed
   *                  after the queue is flushed.
   * @param writer    a character stream used for output.
   */
  public ParallelTextWriter(String name, boolean autoFlush, TextWriter writer)
  {
    this.name = name;
    this.autoFlush = autoFlush;
    this.writer = writer;
    this.queue = new ConcurrentLinkedQueue<String>();
    this.writerThread = null;
    this.stopRequested = new AtomicBoolean(false);
    writerThread = new WriterThread();
    writerThread.start();
    DirectoryServer.registerShutdownListener(this);
  }
  /**
   * The publisher thread is responsible for emptying the queue of log records
   * waiting to published.
   */
  private class WriterThread extends DirectoryThread
  {
    public WriterThread()
    {
      super(name);
    }
    /**
     * the run method of the writerThread. Run until queue is empty
     * AND we've been asked to terminate
     */
    @Override
    public void run()
    {
      while (!stopRequested.get())
      {
        try
        {
          if (queueSemaphore.tryAcquire(10, TimeUnit.SECONDS))
          {
            for (int i = (queueSemaphore.drainPermits() + 1); i > 0; i--)
            {
              String message = queue.poll();
              if (message != null)
              {
                writer.writeRecord(message);
              }
              else
              {
                break;
              }
            }
            if (autoFlush)
            {
              flush();
            }
          }
        }
        catch (InterruptedException ex)
        {
          // Ignore. We'll rerun the loop
          // and presumably fall out.
        }
      }
    }
  }
  /**
   * Write the log record asyncronously.
   *
   * @param record the log record to write.
   */
  public void writeRecord(String record)
  {
    // No writer?  Off to the bit bucket.
    if (writer != null) {
      while (!stopRequested.get())
      {
        // Put request on queue for writer
        queue.add(record);
        queueSemaphore.release();
        break;
      }
    }
  }
  /**
   * {@inheritDoc}
   */
  public void flush()
  {
    writer.flush();
  }
  /**
   * {@inheritDoc}
   */
  public long getBytesWritten()
  {
    return writer.getBytesWritten();
  }
  /**
   * Retrieves the wrapped writer.
   *
   * @return The wrapped writer used by this asyncronous writer.
   */
  public TextWriter getWrappedWriter()
  {
    return writer;
  }
  /**
   * {@inheritDoc}
   */
  public String getShutdownListenerName()
  {
    return "ParallelTextWriter Thread " + name;
  }
  /**
   * {@inheritDoc}
   */
  public void processServerShutdown(Message reason)
  {
    // Don't shutdown the wrapped writer on server shutdown as it
    // might get more write requests before the log publishers are
    // manually shutdown just before the server process exists.
    shutdown(false);
  }
  /**
   * {@inheritDoc}
   */
  public void shutdown()
  {
    shutdown(true);
  }
  /**
   * Releases any resources held by the writer.
   *
   * @param shutdownWrapped If the wrapped writer should be closed as well.
   */
  public void shutdown(boolean shutdownWrapped)
  {
    stopRequested.set(true);
    // Wait for publisher thread to terminate
    while (writerThread != null && writerThread.isAlive()) {
      try {
        // Interrupt the thread if its blocking
        writerThread.interrupt();
        writerThread.join();
      }
      catch (InterruptedException ex) {
        // Ignore; we gotta wait..
      }
    }
    // The writer writerThread SHOULD have drained the queue.
    // If not, handle outstanding requests ourselves,
    // and push them to the writer.
    while (!queue.isEmpty()) {
      String message = queue.poll();
      writer.writeRecord(message);
    }
    // Shutdown the wrapped writer.
    if (shutdownWrapped && writer != null) writer.shutdown();
    DirectoryServer.deregisterShutdownListener(this);
  }
  /**
   * Set the auto flush setting for this writer.
   *
   * @param autoFlush If the writer should flush the buffer after every line.
   */
  public void setAutoFlush(boolean autoFlush)
  {
    this.autoFlush = autoFlush;
  }
}
opends/src/server/org/opends/server/loggers/TextAccessLogPublisher.java
@@ -159,6 +159,10 @@
      {
        currentWriter = ((AsyncronousTextWriter) writer).getWrappedWriter();
      }
      else if (writer instanceof ParallelTextWriter)
      {
        currentWriter = ((ParallelTextWriter) writer).getWrappedWriter();
      }
      else
      {
        currentWriter = writer;
@@ -196,6 +200,14 @@
          asyncWriter.shutdown(false);
        }
        if (writer instanceof ParallelTextWriter && !config.isAsynchronous())
        {
          // The asynchronous setting is being turned off.
          ParallelTextWriter asyncWriter = ((ParallelTextWriter) writer);
          writer = mfWriter;
          asyncWriter.shutdown(false);
        }
        if (!(writer instanceof AsyncronousTextWriter)
            && config.isAsynchronous())
        {
@@ -206,6 +218,16 @@
          writer = asyncWriter;
        }
        if (!(writer instanceof ParallelTextWriter)
            && config.isAsynchronous())
        {
          // The asynchronous setting is being turned on.
          ParallelTextWriter asyncWriter = new ParallelTextWriter(
              "Parallel Text Writer for " + config.dn().toNormalizedString(),
              config.isAutoFlush(), mfWriter);
          writer = asyncWriter;
        }
        if ((currentConfig.isAsynchronous() && config.isAsynchronous())
            && (currentConfig.getQueueSize() != config.getQueueSize()))
        {
@@ -303,9 +325,18 @@
      if (config.isAsynchronous())
      {
        this.writer = new AsyncronousTextWriter("Asyncronous Text Writer for "
            + config.dn().toNormalizedString(), config.getQueueSize(), config
            .isAutoFlush(), writer);
        if (config.getQueueSize() > 0)
        {
          this.writer = new AsyncronousTextWriter(
            "Asyncronous Text Writer for " + config.dn().toNormalizedString(),
            config.getQueueSize(), config.isAutoFlush(), writer);
        }
        else
        {
          this.writer = new ParallelTextWriter(
            "Parallel Text Writer for " + config.dn().toNormalizedString(),
            config.isAutoFlush(), writer);
        }
      }
      else
      {