| | |
| | | import org.opends.server.admin.std.server.ReplicationServerCfg; |
| | | import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg; |
| | | import org.opends.server.api.VirtualAttributeProvider; |
| | | import org.opends.server.backends.ChangelogBackend; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.WorkflowImpl; |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.file.FileChangelogDB; |
| | | import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate; |
| | | import org.opends.server.replication.server.changelog.je.JEChangelogDB; |
| | | import org.opends.server.replication.service.DSRSShutdownSync; |
| | | import org.opends.server.types.*; |
| | |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | |
| | | import static org.opends.messages.ConfigMessages.*; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | |
| | | private final Map<DN, ReplicationServerDomain> baseDNs = |
| | | new HashMap<DN, ReplicationServerDomain>(); |
| | | |
| | | /** The database storing the changes. */ |
| | | private final ChangelogDB changelogDB; |
| | | |
| | | /** The backend that allow to search the changes (external changelog). */ |
| | | private ChangelogBackend changelogBackend; |
| | | |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(); |
| | | private boolean stopListen = false; |
| | | private final ReplSessionSecurity replSessionSecurity; |
| | | |
| | | /** To know whether a domain is enabled for the external changelog. */ |
| | | private final ECLEnabledDomainPredicate domainPredicate; |
| | | |
| | | /** The tracer object for the debug logger. */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | |
| | | */ |
| | | public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException |
| | | { |
| | | this(cfg, new DSRSShutdownSync()); |
| | | this(cfg, new DSRSShutdownSync(), new ECLEnabledDomainPredicate()); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new Replication server using the provided configuration entry. |
| | | * Creates a new Replication server using the provided configuration entry and shutdown |
| | | * synchronization object. |
| | | * |
| | | * @param cfg The configuration of this replication server. |
| | | * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. |
| | | * @throws ConfigException When Configuration is invalid. |
| | | */ |
| | | public ReplicationServer(ReplicationServerCfg cfg, |
| | | DSRSShutdownSync dsrsShutdownSync) throws ConfigException |
| | | public ReplicationServer(ReplicationServerCfg cfg, DSRSShutdownSync dsrsShutdownSync) throws ConfigException |
| | | { |
| | | this(cfg, dsrsShutdownSync, new ECLEnabledDomainPredicate()); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new Replication server using the provided configuration entry, shutdown |
| | | * synchronization object and domain predicate. |
| | | * |
| | | * @param cfg The configuration of this replication server. |
| | | * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. |
| | | * @param predicate Indicates whether a domain is enabled for the external changelog. |
| | | * @throws ConfigException When Configuration is invalid. |
| | | */ |
| | | public ReplicationServer(final ReplicationServerCfg cfg, final DSRSShutdownSync dsrsShutdownSync, |
| | | final ECLEnabledDomainPredicate predicate) throws ConfigException |
| | | { |
| | | this.config = cfg; |
| | | this.dsrsShutdownSync = dsrsShutdownSync; |
| | | this.domainPredicate = predicate; |
| | | ReplicationDBImplementation dbImpl = cfg.getReplicationDBImplementation(); |
| | | if (DebugLogger.debugEnabled()) |
| | | { |
| | | TRACER.debugMessage(DebugLogLevel.INFO, "Using " + dbImpl |
| | | + " as DB implementation for changelog DB"); |
| | | TRACER.debugMessage(DebugLogLevel.INFO, "Using " + dbImpl + " as DB implementation for changelog DB"); |
| | | } |
| | | this.changelogDB = dbImpl == ReplicationDBImplementation.JE |
| | | ? new JEChangelogDB(this, cfg) |
| | |
| | | initialize(); |
| | | cfg.addChangeListener(this); |
| | | |
| | | // TODO : uncomment to branch changelog backend |
| | | //enableExternalChangeLog(); |
| | | |
| | | localPorts.add(getReplicationPort()); |
| | | |
| | | // Keep track of this new instance |
| | |
| | | registerVirtualAttributeRules(); |
| | | } |
| | | |
| | | /** |
| | | * Enable the external changelog if it is not already enabled. |
| | | * <p> |
| | | * The external changelog is provided by the changelog backend. |
| | | * |
| | | * @throws ConfigException |
| | | * If an error occurs. |
| | | */ |
| | | private void enableExternalChangeLog() throws ConfigException |
| | | { |
| | | if (DirectoryServer.hasBackend(ChangelogBackend.BACKEND_ID)) |
| | | { |
| | | // Backend has already been created and initialized |
| | | // This can occurs in tests |
| | | return; |
| | | } |
| | | try |
| | | { |
| | | changelogBackend = new ChangelogBackend(this, domainPredicate); |
| | | changelogBackend.initializeBackend(); |
| | | try |
| | | { |
| | | DirectoryServer.registerBackend(changelogBackend); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | logError(WARN_CONFIG_BACKEND_CANNOT_REGISTER_BACKEND.get(changelogBackend.getBackendID(), |
| | | getExceptionMessage(e))); |
| | | } |
| | | |
| | | registerVirtualAttributeRules(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // TODO : I18N with correct message + what kind of exception should we really throw ? |
| | | // (Directory/Initialization/Config Exception) |
| | | throw new ConfigException(Message.raw("Error when enabling external changelog"), e); |
| | | } |
| | | } |
| | | |
| | | private void shutdownExternalChangelog() |
| | | { |
| | | if (changelogBackend != null) |
| | | { |
| | | DirectoryServer.deregisterBackend(changelogBackend); |
| | | changelogBackend.finalizeBackend(); |
| | | changelogBackend = null; |
| | | } |
| | | deregisterVirtualAttributeRules(); |
| | | } |
| | | |
| | | private List<VirtualAttributeRule> getVirtualAttributesRules() throws DirectoryException |
| | | { |
| | | final List<VirtualAttributeRule> rules = new ArrayList<VirtualAttributeRule>(); |
| | |
| | | return getReplicationServerDomain(baseDN, false); |
| | | } |
| | | |
| | | /** Returns the replicated domain DNs minus the provided set of excluded DNs. */ |
| | | private Set<DN> getDomainDNs(Set<DN> excludedBaseDNs) throws DirectoryException |
| | | { |
| | | Set<DN> domains = null; |
| | | synchronized (baseDNs) |
| | | { |
| | | domains = new HashSet<DN>(baseDNs.keySet()); |
| | | } |
| | | domains.removeAll(excludedBaseDNs); |
| | | return domains; |
| | | } |
| | | |
| | | /** |
| | | * Validate that provided state is coherent with this replication server, |
| | | * when ignoring the provided set of DNs. |
| | | * <p> |
| | | * The state is coherent if and only if it exactly has the set of DNs corresponding to |
| | | * the replication domains. |
| | | * |
| | | * @param state |
| | | * The multi domain state (cookie) to validate. |
| | | * @param ignoredBaseDNs |
| | | * The set of DNs to ignore when validating |
| | | * @throws DirectoryException |
| | | * If the state is not valid |
| | | */ |
| | | public void validateServerState(MultiDomainServerState state, Set<DN> ignoredBaseDNs) throws DirectoryException |
| | | { |
| | | // TODO : should skip unused domains, where domain.getLatestServerState(); is empty |
| | | final Set<DN> domains = getDomainDNs(ignoredBaseDNs); |
| | | final Set<DN> stateDomains = state.getSnapshot().keySet(); |
| | | final Set<DN> domainsCopy = new HashSet<DN>(domains); |
| | | final Set<DN> stateDomainsCopy = new HashSet<DN>(stateDomains); |
| | | domainsCopy.removeAll(stateDomains); |
| | | if (!domainsCopy.isEmpty()) |
| | | { |
| | | final StringBuilder missingDomains = new StringBuilder(); |
| | | for (DN dn : domainsCopy) |
| | | { |
| | | missingDomains.append(dn).append(":;"); |
| | | } |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | | ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get( |
| | | missingDomains, "<" + state.toString() + missingDomains + ">")); |
| | | } |
| | | stateDomainsCopy.removeAll(domains); |
| | | if (!stateDomainsCopy.isEmpty()) |
| | | { |
| | | final StringBuilder startState = new StringBuilder(); |
| | | for (DN dn : domains) { |
| | | startState.append(dn).append(":").append(state.getServerState(dn).toString()).append(";"); |
| | | } |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | | ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get( |
| | | stateDomainsCopy.toString(), startState)); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the ReplicationServerDomain associated to the base DN given in |
| | | * parameter. |
| | |
| | | domain.shutdown(); |
| | | } |
| | | |
| | | // TODO : switch to second method when changelog backend is branched |
| | | shutdownECL(); |
| | | //shutdownExternalChangelog(); |
| | | |
| | | try |
| | | { |