mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
07.40.2013 82f5228d84de25cd2ea7d99e9880a8c11971e743
Code cleanups.

ECLServerHandler.java:
Moved fields to the top of the file.
Extracted method asDate(), toString(CSN), isEligible(), debugInfo(), getNextMessage(), newECLUpdateMsg().
Renamed getNextEligibleMessageForDomain() to computeNextEligibleMessageForDomain().
Renamed initializeCLSearchFromGenState() to initializeCLSearchFromCookie().
In initializeChangelogDomainCtxts(), used foreach + extracted local variable "latestServerState" (no need to duplicate it) + collapsed one if statement.
Removed commented out code.

ECLServerWriter.java:
Minor code cleanup:
- Removed useless parentheses
- Collapsed if statements
2 files modified
374 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 301 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java 73 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -50,6 +50,7 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.replication.protocol.StartECLSessionMsg.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class defines a server handler, which handles all interaction with a
@@ -58,6 +59,11 @@
public final class ECLServerHandler extends ServerHandler
{
  private static int UNDEFINED_PHASE = 0;
  /** TODO JNR. */
  public static int INIT_PHASE = 1;
  private static int PERSISTENT_PHASE = 2;
  /**
   * This is a string identifying the operation, provided by the client part of
   * the ECL, used to help interpretation of messages logged.
@@ -108,6 +114,11 @@
  private CSN eligibleCSN;
  /**
   * The global list of contexts by domain for the search currently processed.
   */
  private DomainContext[] domainCtxts = new DomainContext[0];
  /**
   * Provides a string representation of this object.
   * @return the string representation.
   */
@@ -172,8 +183,7 @@
      buffer.append("[ [active=").append(active)
          .append("] [rsd=").append(rsd)
          .append("] [nextMsg=").append(nextMsg).append("(")
          .append(nextMsg != null ?
          new Date(nextMsg.getCSN().getTime()).toString():"")
          .append(nextMsg != null ? asDate(nextMsg.getCSN()).toString() : "")
          .append(")")
          .append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg)
          .append("] [startState=").append(startState)
@@ -183,16 +193,15 @@
    }
    /**
     * Get the next message eligible regarding
     * the crossDomain eligible CSN. Put it in the context table.
     * @param opid The operation id.
     * Computes the next message eligible regarding the crossDomain eligible
     * CSN.
     *
     * @param opId The operation id.
     */
    private void getNextEligibleMessageForDomain(String opid)
    private void computeNextEligibleMessageForDomain(String opId)
    {
      if (debugEnabled())
        TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDNString() +
          " getNextEligibleMessageForDomain(" + opid+ ") "
          + "ctxt=" + toString());
        debugInfo(opId, "ctxt=" + this);
      assert(nextMsg == null);
      try
@@ -202,22 +211,15 @@
        // not eligible
        if (nextNonEligibleMsg != null)
        {
          boolean hasBecomeEligible =
            (nextNonEligibleMsg.getCSN().getTime()
                <= eligibleCSN.getTime());
          final boolean hasBecomeEligible = isEligible(nextNonEligibleMsg);
          if (debugEnabled())
            TRACER.debugInfo(" In ECLServerHandler, for "
              + mh.getBaseDNString()
              + " getNextEligibleMessageForDomain(" + opid + ") "
              + " stored nonEligibleMsg " + nextNonEligibleMsg
              + " has now become eligible regarding "
              + " the eligibleCSN ("+ eligibleCSN
              + " ):" + hasBecomeEligible);
            debugInfo(opId, "stored nonEligibleMsg " + nextNonEligibleMsg
                + " has now become eligible regarding the eligibleCSN ("
                + eligibleCSN + " ): " + hasBecomeEligible);
          if (hasBecomeEligible)
          {
            // it is now eligible
            nextMsg = nextNonEligibleMsg;
            nextNonEligibleMsg = null;
          }
@@ -226,42 +228,23 @@
        else
        {
          // Here comes a new message !!!
          // non blocking
          UpdateMsg newMsg;
          do {
            newMsg = mh.getNextMessage(false);
            // when the replication changelog is trimmed, the last (latest) chg
            // is left in the db (whatever its age), and we don't want this chg
            // to be returned in the external changelog.
            // So let's check if the chg time is older than the trim date
          } while ((newMsg!=null) &&
              (newMsg.getCSN().getTime() < domainLatestTrimDate));
          if (debugEnabled())
            TRACER.debugInfo(" In ECLServerHandler, for "
                + mh.getBaseDNString()
                + " getNextEligibleMessageForDomain(" + opid + ") "
                + " got new message : "
                +  " baseDN=[" + mh.getBaseDNString()
                + "] [newMsg=" + newMsg + "]" + dumpState());
          // in non blocking mode, return null when no more msg
          if (newMsg != null)
          final UpdateMsg newMsg = getNextMessage();
          if (newMsg == null)
          {
            boolean isEligible = (newMsg.getCSN().getTime()
                <= eligibleCSN.getTime());
            return;
          }
          if (debugEnabled())
              TRACER.debugInfo(" In ECLServerHandler, for "
                + mh.getBaseDNString()
                + " getNextEligibleMessageForDomain(" + opid+ ") "
                + "newMsg isEligible=" + isEligible + " since "
                + "newMsg=[" + newMsg.getCSN()
                + " " + new Date(newMsg.getCSN().getTime())
                + "] eligibleCSN=[" + eligibleCSN
                + " " + new Date(eligibleCSN.getTime())+"]"
            debugInfo(opId, "got new message : [newMsg=" + newMsg + "] "
                + dumpState());
          final boolean isEligible = isEligible(newMsg);
          if (debugEnabled())
            debugInfo(opId, "newMsg isEligible=" + isEligible + " since "
                + "newMsg=[" + toString(newMsg.getCSN()) + "] eligibleCSN=["
                + toString(eligibleCSN) + "] " + dumpState());
            if (isEligible)
            {
              nextMsg = newMsg;
@@ -272,13 +255,50 @@
            }
          }
        }
      }
      catch(Exception e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
    }
    private boolean isEligible(UpdateMsg msg)
    {
      return msg.getCSN().getTime() <= eligibleCSN.getTime();
    }
    private UpdateMsg getNextMessage()
    {
      while (true)
      {
        final UpdateMsg newMsg = mh.getNextMessage(false /* non blocking */);
        if (newMsg == null)
        { // in non blocking mode, null means no more messages
          return null;
        }
        else if (newMsg.getCSN().getTime() < domainLatestTrimDate)
        {
          // when the replication changelog is trimmed, the last (latest) chg
          // is left in the db (whatever its age), and we don't want this chg
          // to be returned in the external changelog.
          // So let's check if the chg time is older than the trim date
          return newMsg;
        }
      }
    }
    private String toString(CSN csn)
    {
      return csn + " " + asDate(csn);
    }
    private void debugInfo(String opId, String message)
    {
      TRACER.debugInfo("In ECLServerHandler, for baseDN="
          + mh.getBaseDNString() + " getNextEligibleMessageForDomain(" + opId
          + ") " + message);
    }
    /**
     * Unregister the handler from the DomainContext ReplicationDomain.
     * @return Whether the handler has been unregistered with success.
@@ -297,11 +317,6 @@
    }
  }
  /**
   * The global list of contexts by domain for the search currently processed.
   */
  private DomainContext[] domainCtxts = new DomainContext[0];
  private String clDomCtxtsToString(String msg)
  {
    StringBuilder buffer = new StringBuilder();
@@ -313,10 +328,6 @@
    return buffer.toString();
  }
  private static int UNDEFINED_PHASE = 0;
  private static int INIT_PHASE = 1;
  private static int PERSISTENT_PHASE = 2;
  /**
   * Starts this handler based on a start message received from remote server.
   * @param inECLStartMsg The start msg provided by the remote server.
@@ -509,13 +520,18 @@
  /**
   * Initialize the handler from a provided cookie value.
   * @param crossDomainStartState The provided cookie value.
   * @throws DirectoryException When an error is raised.
   *
   * @param providedCookie
   *          The provided cookie value.
   * @throws DirectoryException
   *           When an error is raised.
   */
  private void initializeCLSearchFromGenState(String crossDomainStartState)
  private void initializeCLSearchFromCookie(String providedCookie)
      throws DirectoryException
  {
    initializeChangelogDomainCtxts(crossDomainStartState, false);
    this.draftCompat = false;
    initializeChangelogDomainCtxts(providedCookie, false);
  }
  /**
@@ -684,12 +700,9 @@
      // Creates the table that will contain the real-time info for each
      // and every domain.
      Set<DomainContext> tmpSet = new HashSet<DomainContext>();
      String missingDomains = "";
      for (Iterator<ReplicationServerDomain> iter = rs.getDomainIterator();
           iter.hasNext();)
      final StringBuilder missingDomains = new StringBuilder();
      for (ReplicationServerDomain rsd : toIterable(rs.getDomainIterator()))
      {
        ReplicationServerDomain rsd = iter.next();
        // skip the 'unreal' changelog domain
        if (rsd == this.replicationServerDomain)
          continue;
@@ -704,11 +717,12 @@
        }
        // skip unused domains
        if (rsd.getLatestServerState().isEmpty())
        final ServerState latestServerState = rsd.getLatestServerState();
        if (latestServerState.isEmpty())
          continue;
        // Creates the new domain context
        DomainContext newDomainCtxt = new DomainContext();
        final DomainContext newDomainCtxt = new DomainContext();
        newDomainCtxt.active = true;
        newDomainCtxt.rsd = rsd;
        newDomainCtxt.domainLatestTrimDate = rsd.getLatestDomainTrimDate();
@@ -716,7 +730,7 @@
        // Assign the start state for the domain
        if (isPersistent == PERSISTENT_CHANGES_ONLY)
        {
          newDomainCtxt.startState = rsd.getLatestServerState().duplicate();
          newDomainCtxt.startState = latestServerState;
          startStatesFromProvidedCookie.remove(rsd.getBaseDN());
        }
        else
@@ -746,7 +760,7 @@
            // when there is a cookie provided in the request,
            if (newDomainCtxt.startState == null)
            {
              missingDomains += (rsd.getBaseDN() + ":;");
              missingDomains.append(rsd.getBaseDN()).append(":;");
              continue;
            }
            else if (!newDomainCtxt.startState.isEmpty())
@@ -760,7 +774,7 @@
            }
          }
          newDomainCtxt.stopState = rsd.getLatestServerState().duplicate();
          newDomainCtxt.stopState = latestServerState;
        }
        newDomainCtxt.currentState = new ServerState();
@@ -799,9 +813,8 @@
      - if the cookie contains a domain that is even not replicated
      then this case need to be considered here in another loop.
      */
      if (!startStatesFromProvidedCookie.isEmpty())
      if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains)
      {
        if (allowUnknownDomains)
          for (DN providedDomain : startStatesFromProvidedCookie.keySet())
            if (rs.getReplicationServerDomain(providedDomain) == null)
              // the domain provided in the cookie is not replicated
@@ -833,7 +846,7 @@
      // Initializes each and every domain with the next(first) eligible message
      // from the domain.
      for (DomainContext domainCtxt : domainCtxts) {
        domainCtxt.getNextEligibleMessageForDomain(operationId);
        domainCtxt.computeNextEligibleMessageForDomain(operationId);
        if (domainCtxt.nextMsg == null)
          domainCtxt.active = false;
@@ -1055,53 +1068,6 @@
      closeInitPhase();
    }
    /* TODO: From replication CSN
    //--
    if (startCLMsg.getStartMode()==2)
    {
      if (CLSearchFromProvidedExactCSN(startCLMsg.getCSN()))
        return;
    }
    //--
    if (startCLMsg.getStartMode()==4)
    {
      // to get the CL first and last
      initializeCLDomCtxts(null); // from start
      CSN crossDomainEligibleCSN = computeCrossDomainEligibleCSN();
      try
      {
        // to get the CL first and last
        // last rely on the crossDomainEligibleCSN thus must have been
        // computed before
        int[] limits = computeCLLimits(crossDomainEligibleCSN);
        // Send the response
        CLLimitsMsg msg = new CLLimitsMsg(limits[0], limits[1]);
        session.publish(msg);
      }
      catch(Exception e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
        try
        {
          session.publish(
            new ErrorMsg(
             replicationServer.getServerId(),
             serverId,
             Message.raw(Category.SYNC, Severity.INFORMATION,
                 "Exception raised: " + e.getMessage())));
        }
        catch(IOException ioe)
        {
          // FIXME: close conn ?
        }
      }
      return;
    }
     */
    // Store into domain
    registerIntoDomain();
    if (debugEnabled())
@@ -1116,7 +1082,7 @@
    short requestType = msg.getECLRequestType();
    if (requestType == REQUEST_TYPE_FROM_COOKIE)
    {
      initializeCLSearchFromGenState(msg.getCrossDomainServerState());
      initializeCLSearchFromCookie(msg.getCrossDomainServerState());
    }
    else if (requestType == REQUEST_TYPE_FROM_CHANGE_NUMBER)
    {
@@ -1231,12 +1197,7 @@
        }
        // Build the ECLUpdateMsg to be returned
        final ECLUpdateMsg change = new ECLUpdateMsg(
            (LDAPUpdateMsg) oldestContext.nextMsg,
            null, // cookie will be set later
            oldestContext.rsd.getBaseDN(),
            0); // changeNumber may be set later
        oldestContext.nextMsg = null;
        final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
        // Default is not to loop, with one exception
        continueLooping = false;
@@ -1250,8 +1211,7 @@
        // Set and test the domain of the oldestChange see if we reached
        // the end of the phase for this domain
        oldestContext.currentState.update(
            change.getUpdateMsg().getCSN());
        oldestContext.currentState.update(change.getUpdateMsg().getCSN());
        if (oldestContext.currentState.cover(oldestContext.stopState)
            || (draftCompat
@@ -1264,7 +1224,7 @@
        {
          // populates the table with the next eligible msg from iDom
          // in non blocking mode, return null when no more eligible msg
          oldestContext.getNextEligibleMessageForDomain(operationId);
          oldestContext.computeNextEligibleMessageForDomain(operationId);
        }
        oldestChange = change;
      }
@@ -1277,21 +1237,14 @@
                  + "looking for the generalized oldest change"));
        for (DomainContext domainCtxt : domainCtxts) {
          domainCtxt.getNextEligibleMessageForDomain(operationId);
          domainCtxt.computeNextEligibleMessageForDomain(operationId);
        }
        DomainContext oldestContext = findOldestChangeFromDomainCtxts();
        if (oldestContext != null)
        {
          final ECLUpdateMsg change = new ECLUpdateMsg(
              (LDAPUpdateMsg) oldestContext.nextMsg,
              null, // set later
              oldestContext.rsd.getBaseDN(),
              0);
          oldestContext.nextMsg = null; // clean
          oldestContext.currentState.update(
              change.getUpdateMsg().getCSN());
          final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
          oldestContext.currentState.update(change.getUpdateMsg().getCSN());
          if (draftCompat)
          {
@@ -1330,6 +1283,15 @@
    return oldestChange;
  }
  private ECLUpdateMsg newECLUpdateMsg(DomainContext ctx)
  {
    // cookie will be set later AND changeNumber may be set later
    final ECLUpdateMsg change = new ECLUpdateMsg(
        (LDAPUpdateMsg) ctx.nextMsg, null, ctx.rsd.getBaseDN(), 0);
    ctx.nextMsg = null; // clean after use
    return change;
  }
  /**
   * Either retrieves a change number from the DB, or assign a new change number
   * and store in the DB.
@@ -1346,13 +1308,11 @@
  private boolean assignChangeNumber(final ECLUpdateMsg oldestChange)
      throws ChangelogException
  {
    // We also need to check if the CNIndexDB is consistent with
    // the changelogdb.
    // if not, 2 potential reasons
    // a/ : changelog has been purged (trim)let's traverse the CNIndexDB
    // b/ : changelog is late .. let's traverse the changelogDb
    // The following loop allows to loop until being on the same cn
    // in the 2 dbs
    // We also need to check if the CNIndexDB is consistent with the
    // changelogDB. If not, 2 potential reasons:
    // a/ changelog has been purged (trim) let's traverse the CNIndexDB
    // b/ changelog is late ... let's traverse the changelogDb
    // The following loop allows to loop until being on the same cn in the 2 dbs
    // replogCSN : the oldest change from the changelog db
    CSN csnFromChangelogDb = oldestChange.getUpdateMsg().getCSN();
@@ -1374,20 +1334,17 @@
      final DN dnFromCNIndexDB = currentRecord.getBaseDN();
      if (debugEnabled())
        TRACER.debugInfo("assignChangeNumber() generating change number "
            + " comparing the 2 db DNs :" + dnFromChangelogDb + "?="
            + csnFromChangelogDb + " timestamps:"
            + new Date(csnFromChangelogDb.getTime()) + " ?older"
            + new Date(csnFromCNIndexDB.getTime()));
        TRACER.debugInfo("assignChangeNumber() comparing the 2 db DNs :"
            + dnFromChangelogDb + "?=" + dnFromCNIndexDB + " timestamps:"
            + asDate(csnFromChangelogDb) + " ?older"
            + asDate(csnFromCNIndexDB));
      if (areSameChange(csnFromChangelogDb, dnFromChangelogDb,
          csnFromCNIndexDB, dnFromCNIndexDB))
      {
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber() generating change number "
              + " assigning changeNumber=" + currentRecord.getChangeNumber()
              + " to change=" + oldestChange);
          TRACER.debugInfo("assignChangeNumber() assigning changeNumber="
              + currentRecord.getChangeNumber() + " to change=" + oldestChange);
        oldestChange.setChangeNumber(currentRecord.getChangeNumber());
        return true;
@@ -1398,11 +1355,11 @@
      {
        // the change from the changelogDb is older
        // it should have been stored lately
        // let's continue to traverse the changelogdb
        // let's continue to traverse the changelogDB
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber(): will skip "
          TRACER.debugInfo("assignChangeNumber() will skip "
              + csnFromChangelogDb
              + " and read next from the regular changelog.");
              + " and read next change from the regular changelog.");
        return false; // TO BE CHECKED
      }
@@ -1413,18 +1370,17 @@
      try
      {
        // let's traverse the CNIndexDB searching for the change
        // found in the changelogDb.
        // found in the changelogDB
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber() generating change number "
              + " will skip " + csnFromCNIndexDB
          TRACER.debugInfo("assignChangeNumber() will skip " + csnFromCNIndexDB
              + " and read next change from the CNIndexDB.");
        isEndOfCNIndexDBReached = !cnIndexDBCursor.next();
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber() generating change number has"
              + "skipped to  changeNumber=" + currentRecord.getChangeNumber()
              + " csn=" + currentRecord.getCSN() + " End of CNIndexDB ?"
          TRACER.debugInfo("assignChangeNumber() has skipped to changeNumber="
              + currentRecord.getChangeNumber() + " csn="
              + currentRecord.getCSN() + " End of CNIndexDB ?"
              + isEndOfCNIndexDBReached);
      }
      catch (ChangelogException e)
@@ -1439,6 +1395,11 @@
    }
  }
  private Date asDate(CSN csn)
  {
    return new Date(csn.getTime());
  }
  private boolean areSameChange(CSN csn1, DN dn1, CSN csn2, DN dn2)
  {
    boolean sameDN = dn1.compareTo(dn2) == 0;
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -27,11 +27,6 @@
 */
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.IOException;
import java.net.SocketException;
@@ -49,6 +44,11 @@
import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class defines a server writer, which is used to send changes to a
 * directory server.
@@ -90,10 +90,10 @@
    this.shutdown = false;
    // Look for the psearch object related to this operation , the one that
    // will ne notified with new entries to be returned.
    ECLWorkflowElement wfe = (ECLWorkflowElement)
    DirectoryServer.getWorkflowElement(
        ECLWorkflowElement.ECL_WORKFLOW_ELEMENT);
    // will be notified with new entries to be returned.
    ECLWorkflowElement wfe =
        (ECLWorkflowElement) DirectoryServer
            .getWorkflowElement(ECLWorkflowElement.ECL_WORKFLOW_ELEMENT);
    for (PersistentSearch psearch : wfe.getPersistentSearches())
    {
      if (psearch.getSearchOperation().toString().equals(
@@ -143,7 +143,7 @@
      while (true)
      {
        // wait to be resumed or shutdown
        if ((suspended) && (!shutdown))
        if (suspended && !shutdown)
        {
          synchronized(this)
          {
@@ -165,11 +165,11 @@
    }
    catch (SocketException e)
    {
      // Just ignore the exception and let the thread die as well
      if (session != null) // This will always be the case if a socket exception
                           // has occurred.
      // Just ignore the exception and let the thread die as well.
      // session is always null if a socket exception has occurred.
      if (session != null)
      {
        Message errMessage;
        final Message errMessage;
        if (handler.isDataServer())
        {
          errMessage = ERR_DS_BADLY_DISCONNECTED.get(
@@ -214,10 +214,8 @@
   * @throws IOException when raised (connection closure)
   * @throws InterruptedException when raised
   */
  public void doIt()
  throws IOException, InterruptedException
  public void doIt() throws IOException, InterruptedException
  {
    ECLUpdateMsg update = null;
    while (true)
    {
      if (shutdown || suspended)
@@ -225,6 +223,7 @@
        return;
      }
      ECLUpdateMsg update = null;
      try
      {
        handler.refreshEligibleCSN();
@@ -237,33 +236,27 @@
      if (update == null)
      {
        if (handler.getSearchPhase() != 1)
        {
          if (session!=null)
        if (session != null
            && handler.getSearchPhase() != ECLServerHandler.INIT_PHASE)
          {
            // session is null in pusherOnly mode
            // Done is used to end phase 1
            session.publish(new DoneMsg(
                handler.getReplicationServerId(),
                handler.getServerId()));
          }
              handler.getReplicationServerId(), handler.getServerId()));
        }
        if (handler.isPersistent() == StartECLSessionMsg.NON_PERSISTENT)
        {
          // publishing is normally stopped here
        { // publishing is normally stopped here...
          break;
        }
        else
        {
          // except if we are in persistent search
        // ...except if we are in persistent search
          Thread.sleep(200);
        }
      }
      else
      {
        // Publish the update to the remote server using a protocol version he
        // it supports
        // Publish the update to the remote server using a protocol version it
        // supports
        publish(update);
        update = null;
      }
@@ -276,26 +269,22 @@
  public synchronized void shutdownWriter()
  {
    shutdown = true;
    this.notify();
    notify();
  }
  /**
   * Publish a change either on the protocol session or to a persistent search.
   */
  private void publish(ECLUpdateMsg msg)
  throws IOException
  private void publish(ECLUpdateMsg msg) throws IOException
  {
    if (debugEnabled())
      TRACER.debugInfo(this.getName() +
          " publishes msg=[" + msg.toString() + "]");
      TRACER.debugInfo(getName() + " publishes msg=[" + msg + "]");
    if (session!=null)
    {
      session.publish(msg);
    }
    else
    {
      if (mypsearch != null)
    else if (mypsearch != null)
      {
        try
        {
@@ -304,13 +293,11 @@
        }
        catch(Exception e)
        {
          Message errMessage =
            ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler.toString() +
              " " +  stackTraceToSingleLineString(e));
        Message errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get(
            handler + " " + stackTraceToSingleLineString(e));
          logError(errMessage);
          mypsearch.cancel();
        }
      }
    }
  }
}