mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
07.40.2013 82f5228d84de25cd2ea7d99e9880a8c11971e743
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -27,11 +27,6 @@
 */
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.IOException;
import java.net.SocketException;
@@ -49,6 +44,11 @@
import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class defines a server writer, which is used to send changes to a
 * directory server.
@@ -89,11 +89,11 @@
    this.suspended = false;
    this.shutdown = false;
    // Look for the psearch object related to this operation , the one that
    // will ne notified with new entries to be returned.
    ECLWorkflowElement wfe = (ECLWorkflowElement)
    DirectoryServer.getWorkflowElement(
        ECLWorkflowElement.ECL_WORKFLOW_ELEMENT);
    // Look for the psearch object related to this operation, the one that
    // will be notified with new entries to be returned.
    ECLWorkflowElement wfe =
        (ECLWorkflowElement) DirectoryServer
            .getWorkflowElement(ECLWorkflowElement.ECL_WORKFLOW_ELEMENT);
    for (PersistentSearch psearch : wfe.getPersistentSearches())
    {
      if (psearch.getSearchOperation().toString().equals(
@@ -143,7 +143,7 @@
      while (true)
      {
        // wait to be resumed or shutdown
        if ((suspended) && (!shutdown))
        if (suspended && !shutdown)
        {
          synchronized(this)
          {
@@ -165,11 +165,11 @@
    }
    catch (SocketException e)
    {
      // Just ignore the exception and let the thread die as well
      if (session != null) // This will always be the case if a socket exception
                           // has occurred.
      // Just ignore the exception and let the thread die as well.
      // session is always null if a socket exception has occurred.
      if (session != null)
      {
        Message errMessage;
        final Message errMessage;
        if (handler.isDataServer())
        {
          errMessage = ERR_DS_BADLY_DISCONNECTED.get(
@@ -203,7 +203,7 @@
      {
        session.close();
      }
      if (replicationServerDomain!=null)
      if (replicationServerDomain != null)
        replicationServerDomain.stopServer(handler, false);
    }
  }
@@ -214,10 +214,8 @@
   * @throws IOException when raised (connection closure)
   * @throws InterruptedException when raised
   */
  public void doIt()
  throws IOException, InterruptedException
  public void doIt() throws IOException, InterruptedException
  {
    ECLUpdateMsg update = null;
    while (true)
    {
      if (shutdown || suspended)
@@ -225,6 +223,7 @@
        return;
      }
      ECLUpdateMsg update = null;
      try
      {
        handler.refreshEligibleCSN();
@@ -237,38 +236,32 @@
      if (update == null)
      {
        if (handler.getSearchPhase() != 1)
        if (session != null
            && handler.getSearchPhase() != ECLServerHandler.INIT_PHASE)
        {
          if (session!=null)
          {
            // session is null in pusherOnly mode
            // Done is used to end phase 1
            session.publish(new DoneMsg(
                handler.getReplicationServerId(),
                handler.getServerId()));
          }
          // session is null in pusherOnly mode
          // Done is used to end phase 1
          session.publish(new DoneMsg(
              handler.getReplicationServerId(), handler.getServerId()));
        }
        if (handler.isPersistent() == StartECLSessionMsg.NON_PERSISTENT)
        {
          // publishing is normally stopped here
        { // publishing is normally stopped here...
          break;
        }
        else
        {
          // except if we are in persistent search
          Thread.sleep(200);
        }
        // ...except if we are in persistent search
        Thread.sleep(200);
      }
      else
      {
        // Publish the update to the remote server using a protocol version he
        // it supports
        // Publish the update to the remote server using a protocol version it
        // supports
        publish(update);
        update = null;
      }
    }
}
  }
  /**
   * Shutdown the writer.
@@ -276,40 +269,34 @@
  public synchronized void shutdownWriter()
  {
    shutdown = true;
    this.notify();
    notify();
  }
  /**
   * Publish a change either on the protocol session or to a persistent search.
   */
  private void publish(ECLUpdateMsg msg)
  throws IOException
  private void publish(ECLUpdateMsg msg) throws IOException
  {
    if (debugEnabled())
      TRACER.debugInfo(this.getName() +
          " publishes msg=[" + msg.toString() + "]");
      TRACER.debugInfo(getName() + " publishes msg=[" + msg + "]");
    if (session!=null)
    if (session != null)
    {
      session.publish(msg);
    }
    else
    else if (mypsearch != null)
    {
      if (mypsearch != null)
      try
      {
        try
        {
          Entry eclEntry = ECLSearchOperation.createEntryFromMsg(msg);
          mypsearch.processAdd(eclEntry, -1);
        }
        catch(Exception e)
        {
          Message errMessage =
            ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler.toString() +
              " " +  stackTraceToSingleLineString(e));
          logError(errMessage);
          mypsearch.cancel();
        }
        Entry eclEntry = ECLSearchOperation.createEntryFromMsg(msg);
        mypsearch.processAdd(eclEntry, -1);
      }
      catch (Exception e)
      {
        Message errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get(
            handler + " " + stackTraceToSingleLineString(e));
        logError(errMessage);
        mypsearch.cancel();
      }
    }
  }