| | |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.io.IOException; |
| | | import java.net.SocketException; |
| | | |
| | |
| | | import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | | * This class defines a server writer, which is used to send changes to a |
| | | * directory server. |
| | |
| | | this.suspended = false; |
| | | this.shutdown = false; |
| | | |
| | | // Look for the psearch object related to this operation , the one that |
| | | // will ne notified with new entries to be returned. |
| | | ECLWorkflowElement wfe = (ECLWorkflowElement) |
| | | DirectoryServer.getWorkflowElement( |
| | | ECLWorkflowElement.ECL_WORKFLOW_ELEMENT); |
| | | // Look for the psearch object related to this operation, the one that |
| | | // will be notified with new entries to be returned. |
| | | ECLWorkflowElement wfe = |
| | | (ECLWorkflowElement) DirectoryServer |
| | | .getWorkflowElement(ECLWorkflowElement.ECL_WORKFLOW_ELEMENT); |
| | | for (PersistentSearch psearch : wfe.getPersistentSearches()) |
| | | { |
| | | if (psearch.getSearchOperation().toString().equals( |
| | |
| | | while (true) |
| | | { |
| | | // wait to be resumed or shutdown |
| | | if ((suspended) && (!shutdown)) |
| | | if (suspended && !shutdown) |
| | | { |
| | | synchronized(this) |
| | | { |
| | |
| | | } |
| | | catch (SocketException e) |
| | | { |
| | | // Just ignore the exception and let the thread die as well |
| | | if (session != null) // This will always be the case if a socket exception |
| | | // has occurred. |
| | | // Just ignore the exception and let the thread die as well. |
| | | // session is always null if a socket exception has occurred. |
| | | if (session != null) |
| | | { |
| | | Message errMessage; |
| | | final Message errMessage; |
| | | if (handler.isDataServer()) |
| | | { |
| | | errMessage = ERR_DS_BADLY_DISCONNECTED.get( |
| | |
| | | { |
| | | session.close(); |
| | | } |
| | | if (replicationServerDomain!=null) |
| | | if (replicationServerDomain != null) |
| | | replicationServerDomain.stopServer(handler, false); |
| | | } |
| | | } |
| | |
| | | * @throws IOException when raised (connection closure) |
| | | * @throws InterruptedException when raised |
| | | */ |
| | | public void doIt() |
| | | throws IOException, InterruptedException |
| | | public void doIt() throws IOException, InterruptedException |
| | | { |
| | | ECLUpdateMsg update = null; |
| | | while (true) |
| | | { |
| | | if (shutdown || suspended) |
| | |
| | | return; |
| | | } |
| | | |
| | | ECLUpdateMsg update = null; |
| | | try |
| | | { |
| | | handler.refreshEligibleCSN(); |
| | |
| | | |
| | | if (update == null) |
| | | { |
| | | if (handler.getSearchPhase() != 1) |
| | | if (session != null |
| | | && handler.getSearchPhase() != ECLServerHandler.INIT_PHASE) |
| | | { |
| | | if (session!=null) |
| | | { |
| | | // session is null in pusherOnly mode |
| | | // Done is used to end phase 1 |
| | | session.publish(new DoneMsg( |
| | | handler.getReplicationServerId(), |
| | | handler.getServerId())); |
| | | } |
| | | // session is null in pusherOnly mode |
| | | // Done is used to end phase 1 |
| | | session.publish(new DoneMsg( |
| | | handler.getReplicationServerId(), handler.getServerId())); |
| | | } |
| | | |
| | | if (handler.isPersistent() == StartECLSessionMsg.NON_PERSISTENT) |
| | | { |
| | | // publishing is normally stopped here |
| | | { // publishing is normally stopped here... |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | // except if we are in persistent search |
| | | Thread.sleep(200); |
| | | } |
| | | |
| | | // ...except if we are in persistent search |
| | | Thread.sleep(200); |
| | | } |
| | | else |
| | | { |
| | | // Publish the update to the remote server using a protocol version he |
| | | // it supports |
| | | // Publish the update to the remote server using a protocol version it |
| | | // supports |
| | | publish(update); |
| | | update = null; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Shutdown the writer. |
| | |
| | | public synchronized void shutdownWriter() |
| | | { |
| | | shutdown = true; |
| | | this.notify(); |
| | | notify(); |
| | | } |
| | | |
| | | /** |
| | | * Publish a change either on the protocol session or to a persistent search. |
| | | */ |
| | | private void publish(ECLUpdateMsg msg) |
| | | throws IOException |
| | | private void publish(ECLUpdateMsg msg) throws IOException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(this.getName() + |
| | | " publishes msg=[" + msg.toString() + "]"); |
| | | TRACER.debugInfo(getName() + " publishes msg=[" + msg + "]"); |
| | | |
| | | if (session!=null) |
| | | if (session != null) |
| | | { |
| | | session.publish(msg); |
| | | } |
| | | else |
| | | else if (mypsearch != null) |
| | | { |
| | | if (mypsearch != null) |
| | | try |
| | | { |
| | | try |
| | | { |
| | | Entry eclEntry = ECLSearchOperation.createEntryFromMsg(msg); |
| | | mypsearch.processAdd(eclEntry, -1); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | Message errMessage = |
| | | ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler.toString() + |
| | | " " + stackTraceToSingleLineString(e)); |
| | | logError(errMessage); |
| | | mypsearch.cancel(); |
| | | } |
| | | Entry eclEntry = ECLSearchOperation.createEntryFromMsg(msg); |
| | | mypsearch.processAdd(eclEntry, -1); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | Message errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get( |
| | | handler + " " + stackTraceToSingleLineString(e)); |
| | | logError(errMessage); |
| | | mypsearch.cancel(); |
| | | } |
| | | } |
| | | } |