mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
12.41.2015 aaec0227c12c81b76899eb20ff99c947c7715df0
Partial OPENDJ-2016 Implement new on disk merge import strategy based on storage engine

Since this issue will add a new import strategy, current revision introduces the ImportStrategy interface to tame the ever growing list of import strategies.
This may be reused for rebuild-index.


ImportStrategy.java: ADDED

SuccessiveAddsImportStrategy.java: ADDED
Extracted from RootContainer.
Implements ImportStrategy.

Importer.java:
Added StrategyImpl static class that implements ImportStrategy.
2 files added
2 files modified
568 ■■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportStrategy.java 49 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java 71 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java 226 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/SuccessiveAddsImportStrategy.java 222 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportStrategy.java
New file
@@ -0,0 +1,49 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2015 ForgeRock AS
 */
package org.opends.server.backends.pluggable;
import org.opends.server.core.ServerContext;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.LDIFImportResult;
/** The strategy to use for importing LDIF files. */
interface ImportStrategy
{
  /**
   * Imports information from an LDIF file into the supplied root container.
   *
   * @param importConfig
   *          The configuration to use when performing the import.
   * @param serverContext
   *          The server context
   * @return Information about the result of the import processing.
   * @throws DirectoryException
   *           If a problem occurs while performing the LDIF import.
   * @see {@link Backend#importLDIF(LDIFImportConfig, ServerContext)}
   */
  LDIFImportResult importLDIF(LDIFImportConfig importConfig, RootContainer rootContainer, ServerContext serverContext)
      throws DirectoryException;
}
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -31,6 +31,7 @@
import static org.opends.server.backends.pluggable.DnKeyFormat.*;
import static org.opends.server.backends.pluggable.EntryIDSet.*;
import static org.opends.server.backends.pluggable.SuffixContainer.*;
import static org.opends.server.core.DirectoryServer.*;
import static org.opends.server.util.DynamicConstants.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
@@ -85,6 +86,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ByteSequence;
@@ -127,6 +129,46 @@
 */
final class Importer
{
  /**
   * Shim that allows properly constructing an {@link Importer} without polluting
   * {@link ImportStrategy} and {@link RootContainer} with this importer inner workings.
   */
  static final class StrategyImpl implements ImportStrategy
  {
    private final PluggableBackendCfg backendCfg;
    public StrategyImpl(PluggableBackendCfg backendCfg)
    {
      this.backendCfg = backendCfg;
    }
    @Override
    public LDIFImportResult importLDIF(LDIFImportConfig importConfig, RootContainer rootContainer,
        ServerContext serverContext) throws DirectoryException
    {
      try
      {
        return new Importer(rootContainer, importConfig, backendCfg, serverContext).processImport();
      }
      catch (DirectoryException e)
      {
        logger.traceException(e);
        throw e;
      }
      catch (InitializationException | ConfigException e)
      {
        logger.traceException(e);
        throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e);
      }
      catch (Exception e)
      {
        logger.traceException(e);
        throw new DirectoryException(getServerErrorResultCode(),
            LocalizableMessage.raw(stackTraceToSingleLineString(e)), e);
      }
    }
  }
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private static final int TIMER_INTERVAL = 10000;
@@ -853,14 +895,7 @@
    rebuildManager.printStopMessage(startTime);
  }
  /**
   * Import a LDIF using the specified root container.
   *
   * @return A LDIF result.
   * @throws Exception
   *           If the import failed
   */
  public LDIFImportResult processImport() throws Exception
  private LDIFImportResult processImport() throws Exception
  {
    try {
      try
@@ -2989,17 +3024,15 @@
      return timer;
    }
    private int getIndexCount() throws ConfigException, StorageRuntimeException,
        InitializationException
    private int getIndexCount() throws ConfigException, StorageRuntimeException, InitializationException
    {
      switch (rebuildConfig.getRebuildMode())
      {
      case ALL:
        return getTotalIndexCount(cfg);
      case DEGRADED:
        // FIXME: since the storgae is not opened we cannot determine which
        // indexes are degraded. As a workaround, be conservative and assume all
        // indexes need rebuilding.
        // FIXME: since the storage is not opened we cannot determine which indexes are degraded.
        // As a workaround, be conservative and assume all indexes need rebuilding.
        return getTotalIndexCount(cfg);
      default:
        return getRebuildListIndexCount(cfg);
@@ -3023,7 +3056,7 @@
        {
          indexCount += 3;
        }
        else if ("dn2uri".equals(lowerName))
        else if (DN2URI_INDEX_NAME.equals(lowerName))
        {
          indexCount++;
        }
@@ -3057,11 +3090,11 @@
            final String indexType = attrIndexParts[1];
            if (attrIndexParts.length == 2)
            {
              if ("presence".equals(indexType)
                  || "equality".equals(indexType)
                  || "ordering".equals(indexType)
                  || "substring".equals(indexType)
                  || "approximate".equals(indexType))
              if (PRESENCE.toString().equals(indexType)
                  || EQUALITY.toString().equals(indexType)
                  || ORDERING.toString().equals(indexType)
                  || SUBSTRING.toString().equals(indexType)
                  || APPROXIMATE.toString().equals(indexType))
              {
                indexCount++;
              }
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java
@@ -27,8 +27,6 @@
package org.opends.server.backends.pluggable;
import static org.opends.messages.BackendMessages.*;
import static org.opends.messages.UtilityMessages.*;
import static org.opends.server.core.DirectoryServer.*;
import static org.opends.server.util.StaticUtils.*;
import java.util.ArrayList;
@@ -37,8 +35,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.forgerock.i18n.LocalizableMessage;
@@ -56,20 +52,15 @@
import org.opends.server.backends.pluggable.spi.StorageStatus;
import org.opends.server.backends.pluggable.spi.WriteOperation;
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.SearchOperation;
import org.opends.server.core.ServerContext;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.InitializationException;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.LDIFImportResult;
import org.opends.server.types.OpenDsException;
import org.opends.server.types.Operation;
import org.opends.server.types.Privilege;
import org.opends.server.util.LDIFException;
import org.opends.server.util.LDIFReader;
/**
 * Wrapper class for a backend "container". Root container holds all the entry
@@ -78,44 +69,8 @@
 */
public class RootContainer implements ConfigurationChangeListener<PluggableBackendCfg>
{
  /** Logs the progress of the import. */
  private static final class ImportProgress implements Runnable
  {
    private final LDIFReader reader;
    private long previousCount;
    private long previousTime;
    public ImportProgress(LDIFReader reader)
    {
      this.reader = reader;
    }
    @Override
    public void run()
    {
      long latestCount = reader.getEntriesRead() + 0;
      long deltaCount = latestCount - previousCount;
      long latestTime = System.currentTimeMillis();
      long deltaTime = latestTime - previousTime;
      if (deltaTime == 0)
      {
        return;
      }
      long entriesRead = reader.getEntriesRead();
      long entriesIgnored = reader.getEntriesIgnored();
      long entriesRejected = reader.getEntriesRejected();
      float rate = 1000f * deltaCount / deltaTime;
      logger.info(NOTE_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate);
      previousCount = latestCount;
      previousTime = latestTime;
    }
  }
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private static final int IMPORT_PROGRESS_INTERVAL = 10000;
  /** The tree storage. */
  private Storage storage;
@@ -129,8 +84,8 @@
  /** The base DNs contained in this root container. */
  private final ConcurrentHashMap<DN, EntryContainer> entryContainers = new ConcurrentHashMap<DN, EntryContainer>();
  /** The cached value of the next entry identifier to be assigned. */
  private AtomicLong nextid = new AtomicLong(1);
  /** Value of the next entryID to be assigned. */
  private AtomicLong nextEntryID = new AtomicLong(1);
  /** The compressed schema manager for this backend. */
  private PersistentCompressedSchema compressedSchema;
@@ -166,161 +121,14 @@
  }
  LDIFImportResult importLDIF(LDIFImportConfig importConfig, ServerContext serverContext) throws DirectoryException
  {//TODO JNR may call importLDIFWithSuccessiveAdds(importConfig) depending on configured import strategy
    return importLDIFWithOnDiskMerge(importConfig, serverContext);
  {
    return getImportStrategy().importLDIF(importConfig, this, serverContext);
  }
  private LDIFImportResult importLDIFWithSuccessiveAdds(LDIFImportConfig importConfig) throws DirectoryException
  private ImportStrategy getImportStrategy() throws DirectoryException
  {
    try
    {
      ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
      try
      {
        final LDIFReader reader;
        try
        {
          reader = new LDIFReader(importConfig);
        }
        catch (Exception e)
        {
          LocalizableMessage m = ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER.get(stackTraceToSingleLineString(e));
          throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, e);
        }
        long importCount = 0;
        final long startTime = System.currentTimeMillis();
        timerService.scheduleAtFixedRate(new ImportProgress(reader),
            IMPORT_PROGRESS_INTERVAL, IMPORT_PROGRESS_INTERVAL, TimeUnit.MILLISECONDS);
        while (true)
        {
          final Entry entry;
          try
          {
            entry = reader.readEntry();
            if (entry == null)
            {
              break;
            }
          }
          catch (LDIFException le)
          {
            if (!le.canContinueReading())
            {
              LocalizableMessage m = ERR_LDIF_BACKEND_ERROR_READING_LDIF.get(stackTraceToSingleLineString(le));
              throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, le);
            }
            continue;
          }
          final DN dn = entry.getName();
          final EntryContainer ec = getEntryContainer(dn);
          if (ec == null)
          {
            final LocalizableMessage m = ERR_LDIF_SKIP.get(dn);
            logger.error(m);
            reader.rejectLastEntry(m);
            continue;
          }
          try
          {
            ec.addEntry(entry, null);
            importCount++;
          }
          catch (DirectoryException e)
          {
            switch (e.getResultCode().asEnum())
            {
            case ENTRY_ALREADY_EXISTS:
              if (importConfig.replaceExistingEntries())
              {
                final Entry oldEntry = ec.getEntry(entry.getName());
                ec.replaceEntry(oldEntry, entry, null);
              }
              else
              {
                reader.rejectLastEntry(WARN_IMPORT_ENTRY_EXISTS.get());
              }
              break;
            case NO_SUCH_OBJECT:
              reader.rejectLastEntry(ERR_IMPORT_PARENT_NOT_FOUND.get(dn.parent()));
              break;
            default:
              // Not sure why it failed.
              reader.rejectLastEntry(e.getMessageObject());
              break;
            }
          }
        }
        final long finishTime = System.currentTimeMillis();
        waitForShutdown(timerService);
        final long importTime = finishTime - startTime;
        float rate = 0;
        if (importTime > 0)
        {
          rate = 1000f * reader.getEntriesRead() / importTime;
        }
        logger.info(NOTE_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount, reader.getEntriesIgnored(),
            reader.getEntriesRejected(), 0, importTime / 1000, rate);
        return new LDIFImportResult(reader.getEntriesRead(), reader.getEntriesRejected(), reader.getEntriesIgnored());
      }
      finally
      {
        close();
        // if not already stopped, then stop it
        waitForShutdown(timerService);
      }
    }
    catch (DirectoryException e)
    {
      logger.traceException(e);
      throw e;
    }
    catch (OpenDsException e)
    {
      logger.traceException(e);
      throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject());
    }
    catch (Exception e)
    {
      logger.traceException(e);
      throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(e.getMessage()));
    }
  }
  private void waitForShutdown(ScheduledThreadPoolExecutor timerService) throws InterruptedException
  {
    timerService.shutdown();
    timerService.awaitTermination(20, TimeUnit.SECONDS);
  }
  private LDIFImportResult importLDIFWithOnDiskMerge(final LDIFImportConfig importConfig, ServerContext serverContext)
      throws DirectoryException
  {
    try
    {
      return new Importer(this, importConfig, config, serverContext).processImport();
    }
    catch (DirectoryException e)
    {
      logger.traceException(e);
      throw e;
    }
    catch (OpenDsException e)
    {
      logger.traceException(e);
      throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e);
    }
    catch (Exception e)
    {
      logger.traceException(e);
      throw new DirectoryException(getServerErrorResultCode(),
          LocalizableMessage.raw(stackTraceToSingleLineString(e)), e);
    }
    //TODO JNR may call new SuccessiveAddsImportStrategy() depending on configured import strategy
    return new Importer.StrategyImpl(config);
  }
  /**
@@ -391,15 +199,11 @@
   */
  void registerEntryContainer(DN baseDN, EntryContainer entryContainer) throws InitializationException
  {
    EntryContainer ec1 = this.entryContainers.get(baseDN);
    // If an entry container for this baseDN is already open we don't allow
    // another to be opened.
    if (ec1 != null)
    EntryContainer ec = this.entryContainers.get(baseDN);
    if (ec != null)
    {
      throw new InitializationException(ERR_ENTRY_CONTAINER_ALREADY_REGISTERED.get(ec1.getTreePrefix(), baseDN));
      throw new InitializationException(ERR_ENTRY_CONTAINER_ALREADY_REGISTERED.get(ec.getTreePrefix(), baseDN));
    }
    this.entryContainers.put(baseDN, entryContainer);
  }
@@ -432,7 +236,7 @@
      }
    }
    nextid = new AtomicLong(highestID.longValue() + 1);
    nextEntryID = new AtomicLong(highestID.longValue() + 1);
  }
  /**
@@ -470,7 +274,6 @@
      String monitorName = backend.getBackendID() + " Storage";
      monitor = new BackendMonitor(monitorName, this);
    }
    return monitor;
  }
@@ -503,8 +306,7 @@
      // Sort the list in order of priority.
      Collections.sort(trees, new TreePreloadComparator());
      // Preload each tree until we reach the time limit or the cache
      // is filled.
      // Preload each tree until we reach the time limit or the cache is filled.
      try
      {
        throw new UnsupportedOperationException("Not implemented exception");
@@ -638,7 +440,7 @@
   */
  EntryID getNextEntryID()
  {
    return new EntryID(nextid.getAndIncrement());
    return new EntryID(nextEntryID.getAndIncrement());
  }
  /**
@@ -647,7 +449,7 @@
   */
  public void resetNextEntryID()
  {
    nextid.set(1);
    nextEntryID.set(1);
  }
  /** {@inheritDoc} */
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/SuccessiveAddsImportStrategy.java
New file
@@ -0,0 +1,222 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2015 ForgeRock AS
 */
package org.opends.server.backends.pluggable;
import static org.opends.messages.BackendMessages.*;
import static org.opends.messages.UtilityMessages.*;
import static org.opends.server.core.DirectoryServer.*;
import static org.opends.server.util.StaticUtils.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ServerContext;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.LDIFImportResult;
import org.opends.server.types.OpenDsException;
import org.opends.server.util.LDIFException;
import org.opends.server.util.LDIFReader;
/**
 * Imports LDIF entries one by one, by calling
 * {@link EntryContainer#addEntry(Entry, org.opends.server.core.AddOperation)}.
 */
final class SuccessiveAddsImportStrategy implements ImportStrategy
{
  /** Logs the progress of the import. */
  private static final class ImportProgress implements Runnable
  {
    private final LDIFReader reader;
    private long previousCount;
    private long previousTime;
    public ImportProgress(LDIFReader reader)
    {
      this.reader = reader;
    }
    @Override
    public void run()
    {
      long latestCount = reader.getEntriesRead() + 0;
      long deltaCount = latestCount - previousCount;
      long latestTime = System.currentTimeMillis();
      long deltaTime = latestTime - previousTime;
      if (deltaTime == 0)
      {
        return;
      }
      long entriesRead = reader.getEntriesRead();
      long entriesIgnored = reader.getEntriesIgnored();
      long entriesRejected = reader.getEntriesRejected();
      float rate = 1000f * deltaCount / deltaTime;
      logger.info(NOTE_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate);
      previousCount = latestCount;
      previousTime = latestTime;
    }
  }
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private static final int IMPORT_PROGRESS_INTERVAL = 10000;
  /** {@inheritDoc} */
  @Override
  public LDIFImportResult importLDIF(LDIFImportConfig importConfig, RootContainer rootContainer,
      ServerContext serverContext) throws DirectoryException
  {
    try
    {
      ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
      try
      {
        final LDIFReader reader;
        try
        {
          reader = new LDIFReader(importConfig);
        }
        catch (Exception e)
        {
          LocalizableMessage m = ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER.get(stackTraceToSingleLineString(e));
          throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, e);
        }
        long importCount = 0;
        final long startTime = System.currentTimeMillis();
        timerService.scheduleAtFixedRate(new ImportProgress(reader),
            IMPORT_PROGRESS_INTERVAL, IMPORT_PROGRESS_INTERVAL, TimeUnit.MILLISECONDS);
        while (true)
        {
          final Entry entry;
          try
          {
            entry = reader.readEntry();
            if (entry == null)
            {
              break;
            }
          }
          catch (LDIFException le)
          {
            if (!le.canContinueReading())
            {
              LocalizableMessage m = ERR_LDIF_BACKEND_ERROR_READING_LDIF.get(stackTraceToSingleLineString(le));
              throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, le);
            }
            continue;
          }
          final DN dn = entry.getName();
          final EntryContainer ec = rootContainer.getEntryContainer(dn);
          if (ec == null)
          {
            final LocalizableMessage m = ERR_LDIF_SKIP.get(dn);
            logger.error(m);
            reader.rejectLastEntry(m);
            continue;
          }
          try
          {
            ec.addEntry(entry, null);
            importCount++;
          }
          catch (DirectoryException e)
          {
            switch (e.getResultCode().asEnum())
            {
            case ENTRY_ALREADY_EXISTS:
              if (importConfig.replaceExistingEntries())
              {
                final Entry oldEntry = ec.getEntry(entry.getName());
                ec.replaceEntry(oldEntry, entry, null);
              }
              else
              {
                reader.rejectLastEntry(WARN_IMPORT_ENTRY_EXISTS.get());
              }
              break;
            case NO_SUCH_OBJECT:
              reader.rejectLastEntry(ERR_IMPORT_PARENT_NOT_FOUND.get(dn.parent()));
              break;
            default:
              // Not sure why it failed.
              reader.rejectLastEntry(e.getMessageObject());
              break;
            }
          }
        }
        final long finishTime = System.currentTimeMillis();
        waitForShutdown(timerService);
        final long importTime = finishTime - startTime;
        float rate = 0;
        if (importTime > 0)
        {
          rate = 1000f * reader.getEntriesRead() / importTime;
        }
        logger.info(NOTE_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount, reader.getEntriesIgnored(),
            reader.getEntriesRejected(), 0, importTime / 1000, rate);
        return new LDIFImportResult(reader.getEntriesRead(), reader.getEntriesRejected(), reader.getEntriesIgnored());
      }
      finally
      {
        rootContainer.close();
        // if not already stopped, then stop it
        waitForShutdown(timerService);
      }
    }
    catch (DirectoryException e)
    {
      logger.traceException(e);
      throw e;
    }
    catch (OpenDsException e)
    {
      logger.traceException(e);
      throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject());
    }
    catch (Exception e)
    {
      logger.traceException(e);
      throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(e.getMessage()));
    }
  }
  private void waitForShutdown(ScheduledThreadPoolExecutor timerService) throws InterruptedException
  {
    timerService.shutdown();
    timerService.awaitTermination(20, TimeUnit.SECONDS);
  }
}