mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
14.37.2009 5ec0cb08889c9f1a24fd4cc8b139dcdb942dd92a
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -66,6 +66,7 @@
  private short protocolVersion = -1;
  private boolean suspended;
  private boolean shutdown;
  private PersistentSearch mypsearch;
  /**
   * Create a ServerWriter.
@@ -80,7 +81,7 @@
  {
    super(session, (short)-1, handler, replicationServerDomain);
    setName("Replication ECL Writer Thread for op:" +
    setName("Replication ECL Writer Thread for operation " +
        handler.getOperationId());
    this.session = session;
@@ -90,6 +91,21 @@
    this.protocolVersion = handler.getProtocolVersion();
    this.suspended = false;
    this.shutdown = false;
    // Look for the psearch object related to this operation , the one that
    // will ne notified with new entries to be returned.
    ECLWorkflowElement wfe = (ECLWorkflowElement)
    DirectoryServer.getWorkflowElement(
        ECLWorkflowElement.ECL_WORKFLOW_ELEMENT);
    for (PersistentSearch psearch : wfe.getPersistentSearches())
    {
      if (psearch.getSearchOperation().toString().equals(
          handler.getOperationId()))
      {
        mypsearch = psearch;
        break;
      }
    }
  }
  /**
@@ -139,18 +155,14 @@
        }
        if (shutdown)
        {
          return;
        }
        // Not suspended
        doIt();
        if (shutdown)
        {
          return;
        }
        suspendWriter();
      }
    }
@@ -186,7 +198,7 @@
  }
  /**
   * Loop gettting changes from the domain and publishing them either to
   * Loop geting changes from the domain and publishing them either to
   * the provided session or to the ECL session interface.
   * @throws IOException when raised (connection closure)
   * @throws InterruptedException when raised
@@ -213,6 +225,7 @@
      {
        try
        {
          handler.refreshEligibleCN();
          update = handler.takeECLUpdate();
        }
        catch(DirectoryException de)
@@ -273,7 +286,8 @@
  throws IOException
  {
    if (debugEnabled())
      TRACER.debugInfo(this + " publishes msg=[" + msg.toString() + "]");
      TRACER.debugInfo(this.getName() +
          " publishes msg=[" + msg.toString() + "]");
    if (session!=null)
    {
@@ -281,17 +295,12 @@
    }
    else
    {
      ECLWorkflowElement wfe = (ECLWorkflowElement)
      DirectoryServer.getWorkflowElement(
          ECLWorkflowElement.ECL_WORKFLOW_ELEMENT);
      // Notify persistent searches.
      for (PersistentSearch psearch : wfe.getPersistentSearches())
      if (mypsearch != null)
      {
        try
        {
          Entry eclEntry = ECLSearchOperation.createEntryFromMsg(msg);
          psearch.processAdd(eclEntry, -1);
          mypsearch.processAdd(eclEntry, -1);
        }
        catch(Exception e)
        {
@@ -299,6 +308,7 @@
            ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler.toString() +
              " " +  stackTraceToSingleLineString(e));
          logError(errMessage);
          mypsearch.cancel();
        }
      }
    }