| | |
| | | import java.util.*; |
| | | import java.util.concurrent.CopyOnWriteArraySet; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | |
| | | import org.opends.server.api.VirtualAttributeProvider; |
| | | import org.opends.server.backends.ChangelogBackend; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.WorkflowImpl; |
| | | import org.opends.server.core.networkgroups.NetworkGroup; |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.*; |
| | |
| | | import org.opends.server.replication.server.changelog.je.JEChangelogDB; |
| | | import org.opends.server.replication.service.DSRSShutdownSync; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.ServerConstants; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | |
| | | import static org.opends.messages.ConfigMessages.*; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | |
| | | /** To know whether a domain is enabled for the external changelog. */ |
| | | private final ECLEnabledDomainPredicate domainPredicate; |
| | | |
| | | private static final String eclWorkflowID = |
| | | "External Changelog Workflow ID"; |
| | | private ECLWorkflowElement eclwe; |
| | | private final AtomicReference<WorkflowImpl> eclWorkflowImpl = |
| | | new AtomicReference<WorkflowImpl>(); |
| | | |
| | | /** |
| | | * This is required for unit testing, so that we can keep track of all the |
| | | * replication servers which are running in the VM. |
| | |
| | | this.config = cfg; |
| | | this.dsrsShutdownSync = dsrsShutdownSync; |
| | | this.domainPredicate = predicate; |
| | | |
| | | enableExternalChangeLog(); |
| | | ReplicationDBImplementation dbImpl = cfg.getReplicationDBImplementation(); |
| | | logger.trace("Using %s as DB implementation for changelog DB", dbImpl); |
| | | if (dbImpl == ReplicationDBImplementation.JE) |
| | |
| | | initialize(); |
| | | cfg.addChangeListener(this); |
| | | |
| | | // TODO : uncomment to branch changelog backend |
| | | //enableExternalChangeLog(); |
| | | |
| | | localPorts.add(getReplicationPort()); |
| | | |
| | | // Keep track of this new instance |
| | |
| | | listenThread = new ReplicationServerListenThread(this); |
| | | listenThread.start(); |
| | | |
| | | // Creates the ECL workflow elem so that DS (LDAPReplicationDomain) |
| | | // can know me and really enableECL. |
| | | if (WorkflowImpl.getWorkflow(eclWorkflowID) != null) |
| | | { |
| | | // Already done. Nothing to do |
| | | return; |
| | | } |
| | | eclwe = new ECLWorkflowElement(this); |
| | | |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | logger.trace("RS " + getMonitorInstanceName() + " successfully initialized"); |
| | |
| | | } catch (IOException e) |
| | | { |
| | | logger.error(ERR_COULD_NOT_BIND_CHANGELOG, getReplicationPort(), e.getMessage()); |
| | | } catch (DirectoryException e) |
| | | { |
| | | //FIXME:DirectoryException is raised by initializeECL => fix err msg |
| | | logger.error(LocalizableMessage.raw( |
| | | "Directory Exception raised by ECL initialization: %s", e.getMessage())); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Enable the ECL access by creating a dedicated workflow element. |
| | | * @throws DirectoryException when an error occurs. |
| | | */ |
| | | public void enableECL() throws DirectoryException |
| | | { |
| | | if (eclWorkflowImpl.get() != null) |
| | | { |
| | | // ECL is already enabled, do nothing |
| | | return; |
| | | } |
| | | |
| | | // Create the workflow for the base DN |
| | | // and register the workflow with the server. |
| | | final DN dn = DN.valueOf(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); |
| | | final WorkflowImpl workflowImpl = new WorkflowImpl(eclWorkflowID, dn, |
| | | eclwe.getWorkflowElementID(), eclwe); |
| | | if (!eclWorkflowImpl.compareAndSet(null, workflowImpl)) |
| | | { |
| | | // ECL is being enabled, do nothing |
| | | return; |
| | | } |
| | | |
| | | workflowImpl.register(); |
| | | |
| | | NetworkGroup.getDefaultNetworkGroup().registerWorkflow(workflowImpl); |
| | | |
| | | // FIXME:ECL should the ECL Workflow be registered in admin and internal |
| | | // network groups? |
| | | NetworkGroup.getAdminNetworkGroup().registerWorkflow(workflowImpl); |
| | | NetworkGroup.getInternalNetworkGroup().registerWorkflow(workflowImpl); |
| | | |
| | | registerVirtualAttributeRules(); |
| | | } |
| | | |
| | | /** |
| | | * Enable the external changelog if it is not already enabled. |
| | | * <p> |
| | | * The external changelog is provided by the changelog backend. |
| | |
| | | } |
| | | } |
| | | |
| | | private void shutdownECL() |
| | | { |
| | | WorkflowImpl eclwf = (WorkflowImpl) WorkflowImpl.getWorkflow(eclWorkflowID); |
| | | // do it only if not already done by another RS (unit test case) |
| | | if (eclwf != null) |
| | | { |
| | | // FIXME:ECL should the ECL Workflow be registered in admin and internal |
| | | // network groups? |
| | | NetworkGroup.getInternalNetworkGroup().deregisterWorkflow(eclWorkflowID); |
| | | NetworkGroup.getAdminNetworkGroup().deregisterWorkflow(eclWorkflowID); |
| | | |
| | | NetworkGroup.getDefaultNetworkGroup().deregisterWorkflow(eclWorkflowID); |
| | | |
| | | deregisterVirtualAttributeRules(); |
| | | |
| | | eclwf.deregister(); |
| | | eclwf.finalizeWorkflow(); |
| | | } |
| | | |
| | | eclwe = (ECLWorkflowElement) DirectoryServer |
| | | .getWorkflowElement("EXTERNAL CHANGE LOG"); |
| | | if (eclwe != null) |
| | | { |
| | | DirectoryServer.deregisterWorkflowElement(eclwe); |
| | | eclwe.finalizeWorkflowElement(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the ReplicationServerDomain associated to the base DN given in |
| | | * parameter. |
| | |
| | | domain.shutdown(); |
| | | } |
| | | |
| | | // TODO : switch to second method when changelog backend is branched |
| | | shutdownECL(); |
| | | //shutdownExternalChangelog(); |
| | | shutdownExternalChangelog(); |
| | | |
| | | try |
| | | { |