From 54cb24af9165680e648071929bbbd271b9af1e4e Mon Sep 17 00:00:00 2001
From: abobrov <abobrov@localhost>
Date: Fri, 09 Oct 2009 14:52:14 +0000
Subject: [PATCH] - EXPERIMENTAL Parallel Text Writer implementation.

---
 opends/src/server/org/opends/server/loggers/TextAccessLogPublisher.java                        |   37 +++++
 opends/src/admin/defn/org/opends/server/admin/std/FileBasedAccessLogPublisherConfiguration.xml |   10 +
 opends/src/server/org/opends/server/loggers/ParallelTextWriter.java                            |  257 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 299 insertions(+), 5 deletions(-)

diff --git a/opends/src/admin/defn/org/opends/server/admin/std/FileBasedAccessLogPublisherConfiguration.xml b/opends/src/admin/defn/org/opends/server/admin/std/FileBasedAccessLogPublisherConfiguration.xml
index 95f1bab..f18115d 100644
--- a/opends/src/admin/defn/org/opends/server/admin/std/FileBasedAccessLogPublisherConfiguration.xml
+++ b/opends/src/admin/defn/org/opends/server/admin/std/FileBasedAccessLogPublisherConfiguration.xml
@@ -23,7 +23,7 @@
   ! CDDL HEADER END
   !
   !
-  !      Copyright 2007-2008 Sun Microsystems, Inc.
+  !      Copyright 2007-2009 Sun Microsystems, Inc.
   ! -->
 <adm:managed-object name="file-based-access-log-publisher"
   plural-name="file-based-access-log-publishers"
@@ -74,6 +74,12 @@
       The maximum number of log records that can be stored in the
       asynchronous queue.
     </adm:synopsis>
+    <adm:description>
+      Setting the queue size to zero activates parallel log writer
+      implementation which has no queue size limit and as such the
+      parallel log writer should only be used on a very well tuned
+      server configuration to avoid potential out of memory errors.
+    </adm:description>
     <adm:requires-admin-action>
       <adm:other>
         <adm:synopsis>
@@ -88,7 +94,7 @@
       </adm:defined>
     </adm:default-behavior>
     <adm:syntax>
-      <adm:integer lower-limit="1" />
+      <adm:integer lower-limit="0" />
     </adm:syntax>
     <adm:profile name="ldap">
       <ldap:attribute>
diff --git a/opends/src/server/org/opends/server/loggers/ParallelTextWriter.java b/opends/src/server/org/opends/server/loggers/ParallelTextWriter.java
new file mode 100644
index 0000000..ac5f11b
--- /dev/null
+++ b/opends/src/server/org/opends/server/loggers/ParallelTextWriter.java
@@ -0,0 +1,257 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
+ */
+package org.opends.server.loggers;
+
+
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.opends.messages.Message;
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.api.ServerShutdownListener;
+import org.opends.server.core.DirectoryServer;
+
+
+
+/**
+ * A Text Writer which writes log records asynchronously to
+ * character-based stream. Note that this implementation is
+ * parallel unbound ie there is no queue size cap imposed.
+ */
+public class ParallelTextWriter
+    implements ServerShutdownListener, TextWriter
+{
+  /**
+   * The wrapped Text Writer.
+   */
+  private final TextWriter writer;
+
+  /** Queue to store unpublished records. */
+  private final ConcurrentLinkedQueue<String> queue;
+
+  private final Semaphore queueSemaphore = new Semaphore(0, false);
+
+  private String name;
+  private AtomicBoolean stopRequested;
+  private WriterThread writerThread;
+
+  private boolean autoFlush;
+
+  /**
+   * Construct a new ParallelTextWriter wrapper.
+   *
+   * @param name      the name of the thread.
+   * @param autoFlush indicates if the underlying writer should be flushed
+   *                  after the queue is flushed.
+   * @param writer    a character stream used for output.
+   */
+  public ParallelTextWriter(String name, boolean autoFlush, TextWriter writer)
+  {
+    this.name = name;
+    this.autoFlush = autoFlush;
+    this.writer = writer;
+
+    this.queue = new ConcurrentLinkedQueue<String>();
+    this.writerThread = null;
+    this.stopRequested = new AtomicBoolean(false);
+
+    writerThread = new WriterThread();
+    writerThread.start();
+
+    DirectoryServer.registerShutdownListener(this);
+  }
+
+  /**
+   * The publisher thread is responsible for emptying the queue of log records
+   * waiting to published.
+   */
+  private class WriterThread extends DirectoryThread
+  {
+    public WriterThread()
+    {
+      super(name);
+    }
+    /**
+     * the run method of the writerThread. Run until queue is empty
+     * AND we've been asked to terminate
+     */
+    @Override
+    public void run()
+    {
+      while (!stopRequested.get())
+      {
+        try
+        {
+          if (queueSemaphore.tryAcquire(10, TimeUnit.SECONDS))
+          {
+            for (int i = (queueSemaphore.drainPermits() + 1); i > 0; i--)
+            {
+              String message = queue.poll();
+              if (message != null)
+              {
+                writer.writeRecord(message);
+              }
+              else
+              {
+                break;
+              }
+            }
+            if (autoFlush)
+            {
+              flush();
+            }
+          }
+        }
+        catch (InterruptedException ex)
+        {
+          // Ignore. We'll rerun the loop
+          // and presumably fall out.
+        }
+      }
+    }
+  }
+
+  /**
+   * Write the log record asyncronously.
+   *
+   * @param record the log record to write.
+   */
+  public void writeRecord(String record)
+  {
+    // No writer?  Off to the bit bucket.
+    if (writer != null) {
+      while (!stopRequested.get())
+      {
+        // Put request on queue for writer
+        queue.add(record);
+        queueSemaphore.release();
+        break;
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void flush()
+  {
+    writer.flush();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public long getBytesWritten()
+  {
+    return writer.getBytesWritten();
+  }
+
+  /**
+   * Retrieves the wrapped writer.
+   *
+   * @return The wrapped writer used by this asyncronous writer.
+   */
+  public TextWriter getWrappedWriter()
+  {
+    return writer;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public String getShutdownListenerName()
+  {
+    return "ParallelTextWriter Thread " + name;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void processServerShutdown(Message reason)
+  {
+    // Don't shutdown the wrapped writer on server shutdown as it
+    // might get more write requests before the log publishers are
+    // manually shutdown just before the server process exists.
+    shutdown(false);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void shutdown()
+  {
+    shutdown(true);
+  }
+
+  /**
+   * Releases any resources held by the writer.
+   *
+   * @param shutdownWrapped If the wrapped writer should be closed as well.
+   */
+  public void shutdown(boolean shutdownWrapped)
+  {
+    stopRequested.set(true);
+
+    // Wait for publisher thread to terminate
+    while (writerThread != null && writerThread.isAlive()) {
+      try {
+        // Interrupt the thread if its blocking
+        writerThread.interrupt();
+        writerThread.join();
+      }
+      catch (InterruptedException ex) {
+        // Ignore; we gotta wait..
+      }
+    }
+
+    // The writer writerThread SHOULD have drained the queue.
+    // If not, handle outstanding requests ourselves,
+    // and push them to the writer.
+    while (!queue.isEmpty()) {
+      String message = queue.poll();
+      writer.writeRecord(message);
+    }
+
+    // Shutdown the wrapped writer.
+    if (shutdownWrapped && writer != null) writer.shutdown();
+
+    DirectoryServer.deregisterShutdownListener(this);
+  }
+
+  /**
+   * Set the auto flush setting for this writer.
+   *
+   * @param autoFlush If the writer should flush the buffer after every line.
+   */
+  public void setAutoFlush(boolean autoFlush)
+  {
+    this.autoFlush = autoFlush;
+  }
+}
diff --git a/opends/src/server/org/opends/server/loggers/TextAccessLogPublisher.java b/opends/src/server/org/opends/server/loggers/TextAccessLogPublisher.java
index 5d145ee..7de3401 100644
--- a/opends/src/server/org/opends/server/loggers/TextAccessLogPublisher.java
+++ b/opends/src/server/org/opends/server/loggers/TextAccessLogPublisher.java
@@ -159,6 +159,10 @@
       {
         currentWriter = ((AsyncronousTextWriter) writer).getWrappedWriter();
       }
+      else if (writer instanceof ParallelTextWriter)
+      {
+        currentWriter = ((ParallelTextWriter) writer).getWrappedWriter();
+      }
       else
       {
         currentWriter = writer;
@@ -196,6 +200,14 @@
           asyncWriter.shutdown(false);
         }
 
+        if (writer instanceof ParallelTextWriter && !config.isAsynchronous())
+        {
+          // The asynchronous setting is being turned off.
+          ParallelTextWriter asyncWriter = ((ParallelTextWriter) writer);
+          writer = mfWriter;
+          asyncWriter.shutdown(false);
+        }
+
         if (!(writer instanceof AsyncronousTextWriter)
             && config.isAsynchronous())
         {
@@ -206,6 +218,16 @@
           writer = asyncWriter;
         }
 
+        if (!(writer instanceof ParallelTextWriter)
+            && config.isAsynchronous())
+        {
+          // The asynchronous setting is being turned on.
+          ParallelTextWriter asyncWriter = new ParallelTextWriter(
+              "Parallel Text Writer for " + config.dn().toNormalizedString(),
+              config.isAutoFlush(), mfWriter);
+          writer = asyncWriter;
+        }
+
         if ((currentConfig.isAsynchronous() && config.isAsynchronous())
             && (currentConfig.getQueueSize() != config.getQueueSize()))
         {
@@ -303,9 +325,18 @@
 
       if (config.isAsynchronous())
       {
-        this.writer = new AsyncronousTextWriter("Asyncronous Text Writer for "
-            + config.dn().toNormalizedString(), config.getQueueSize(), config
-            .isAutoFlush(), writer);
+        if (config.getQueueSize() > 0)
+        {
+          this.writer = new AsyncronousTextWriter(
+            "Asyncronous Text Writer for " + config.dn().toNormalizedString(),
+            config.getQueueSize(), config.isAutoFlush(), writer);
+        }
+        else
+        {
+          this.writer = new ParallelTextWriter(
+            "Parallel Text Writer for " + config.dn().toNormalizedString(),
+            config.isAutoFlush(), writer);
+        }
       }
       else
       {

--
Gitblit v1.10.0