| | |
| | | } |
| | | |
| | | if (shutdown) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | // Not suspended |
| | | doIt(); |
| | | |
| | | if (shutdown) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | suspendWriter(); |
| | | } |
| | |
| | | } |
| | | finally |
| | | { |
| | | if (session!=null) |
| | | if (session != null) |
| | | { |
| | | session.close(); |
| | | } |
| | | if (replicationServerDomain != null) |
| | | { |
| | | replicationServerDomain.stopServer(handler, false); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | private void doIt() throws IOException, InterruptedException |
| | | { |
| | | while (true) |
| | | while (!shutdown && !suspended) |
| | | { |
| | | if (shutdown || suspended) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | ECLUpdateMsg update = null; |
| | | try |
| | | { |
| | | update = handler.takeECLUpdate(); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | logger.traceException(de); |
| | | } |
| | | |
| | | if (update == null) |
| | | final ECLUpdateMsg updateMsg = takeECLUpdate(handler); |
| | | if (updateMsg == null) |
| | | { |
| | | if (session != null && handler.isInitPhaseDone()) |
| | | { |
| | |
| | | } |
| | | else |
| | | { |
| | | // Publish the update to the remote server using a protocol version it |
| | | // supports |
| | | publish(update); |
| | | update = null; |
| | | // Publish the update to the remote server using a protocol version it supports |
| | | publish(updateMsg); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private ECLUpdateMsg takeECLUpdate(ECLServerHandler handler) |
| | | { |
| | | try |
| | | { |
| | | return handler.takeECLUpdate(); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | logger.traceException(de); |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Shutdown the writer. |
| | | */ |
| | |
| | | private void publish(ECLUpdateMsg msg) throws IOException |
| | | { |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | logger.trace(getName() + " publishes msg=[" + msg + "]"); |
| | | } |
| | | |
| | | if (session != null) |
| | | { |
| | |
| | | { |
| | | try |
| | | { |
| | | // Using processAdd() because all ECLUpdateMsgs are adds to the external changelog |
| | | // (even though the underlying changes can be adds, deletes, modifies or modDNs) |
| | | Entry eclEntry = ECLSearchOperation.createEntryFromMsg(msg); |
| | | mypsearch.processAdd(eclEntry, -1); |
| | | mypsearch.processAdd(eclEntry); |
| | | } |
| | | catch (Exception e) |
| | | { |