| | |
| | | import org.opends.server.replication.protocol.DoneMsg; |
| | | import org.opends.server.replication.protocol.ECLUpdateMsg; |
| | | import org.opends.server.replication.protocol.Session; |
| | | import org.opends.server.replication.service.DSRSShutdownSync; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation; |
| | |
| | | * This class defines a server writer, which is used to send changes to a |
| | | * directory server. |
| | | */ |
| | | public class ECLServerWriter extends ServerWriter |
| | | class ECLServerWriter extends ServerWriter |
| | | { |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | |
| | | private final ReplicationServerDomain replicationServerDomain; |
| | | private boolean suspended; |
| | | private volatile boolean shutdown; |
| | | private PersistentSearch mypsearch; |
| | | private final PersistentSearch mypsearch; |
| | | |
| | | /** |
| | | * Create a ServerWriter. |
| | |
| | | * @param replicationServerDomain the ReplicationServerDomain of this |
| | | * ServerWriter. |
| | | */ |
| | | public ECLServerWriter(Session session, ECLServerHandler handler, |
| | | ECLServerWriter(Session session, ECLServerHandler handler, |
| | | ReplicationServerDomain replicationServerDomain) |
| | | { |
| | | super(session, handler, replicationServerDomain); |
| | | super(session, handler, replicationServerDomain, new DSRSShutdownSync()); |
| | | |
| | | setName("Replication ECL Writer Thread for operation " + |
| | | handler.getOperationId()); |
| | |
| | | this.replicationServerDomain = replicationServerDomain; |
| | | this.suspended = false; |
| | | this.shutdown = false; |
| | | this.mypsearch = findPersistentSearch(handler); |
| | | } |
| | | |
| | | // Look for the psearch object related to this operation, the one that |
| | | // will be notified with new entries to be returned. |
| | | ECLWorkflowElement wfe = |
| | | (ECLWorkflowElement) DirectoryServer |
| | | .getWorkflowElement(ECLWorkflowElement.ECL_WORKFLOW_ELEMENT); |
| | | /** |
| | | * Look for the persistent search object related to this operation, the one |
| | | * that will be notified with new entries to be returned. |
| | | */ |
| | | private PersistentSearch findPersistentSearch(ECLServerHandler handler) |
| | | { |
| | | ECLWorkflowElement wfe = (ECLWorkflowElement) |
| | | DirectoryServer.getWorkflowElement(ECLWorkflowElement.ECL_WORKFLOW_ELEMENT); |
| | | for (PersistentSearch psearch : wfe.getPersistentSearches()) |
| | | { |
| | | if (psearch.getSearchOperation().toString().equals( |
| | | handler.getOperationId())) |
| | | { |
| | | mypsearch = psearch; |
| | | break; |
| | | return psearch; |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | |
| | | * waiting for the startCLSessionMsg. Then it may be |
| | | * suspended between 2 jobs, each job being a separate search. |
| | | */ |
| | | public synchronized void suspendWriter() |
| | | private synchronized void suspendWriter() |
| | | { |
| | | suspended = true; |
| | | } |
| | |
| | | /** |
| | | * Resume the writer. |
| | | */ |
| | | public synchronized void resumeWriter() |
| | | synchronized void resumeWriter() |
| | | { |
| | | suspended = false; |
| | | notify(); |
| | |
| | | * @throws IOException when raised (connection closure) |
| | | * @throws InterruptedException when raised |
| | | */ |
| | | public void doIt() throws IOException, InterruptedException |
| | | private void doIt() throws IOException, InterruptedException |
| | | { |
| | | while (true) |
| | | { |
| | |
| | | /** |
| | | * Shutdown the writer. |
| | | */ |
| | | public synchronized void shutdownWriter() |
| | | synchronized void shutdownWriter() |
| | | { |
| | | shutdown = true; |
| | | notify(); |