| | |
| | | private final SortedMap<CSN, FakeOperation> replayOperations = |
| | | new TreeMap<CSN, FakeOperation>(); |
| | | |
| | | /** |
| | | * The isolation policy that this domain is going to use. |
| | | * This field describes the behavior of the domain when an update is |
| | | * attempted and the domain could not connect to any Replication Server. |
| | | * Possible values are accept-updates or deny-updates, but other values |
| | | * may be added in the future. |
| | | */ |
| | | private IsolationPolicy isolationPolicy; |
| | | |
| | | /** |
| | | * The DN of the configuration entry of this domain. |
| | | */ |
| | | private final DN configDn; |
| | | private ReplicationDomainCfg config; |
| | | private ExternalChangelogDomain eclDomain; |
| | | |
| | | /** |
| | |
| | | private static final int FRACTIONAL_BECOME_NO_OP = 3; |
| | | |
| | | /** |
| | | * This configuration boolean indicates if this ReplicationDomain should log |
| | | * CSNs. |
| | | */ |
| | | private boolean logCSN = false; |
| | | |
| | | /** |
| | | * This configuration integer indicates the time the domain keeps the |
| | | * historical information necessary to solve conflicts.<br> |
| | | * When a change stored in the historical part of the user entry has a date |
| | | * (from its replication CSN) older than this delay, it is candidate to be |
| | | * purged. |
| | | */ |
| | | private long histPurgeDelayInMilliSec = 0; |
| | | |
| | | /** |
| | | * The last CSN purged in this domain. Allows to have a continuous purging |
| | | * process from one purge processing (task run) to the next one. Values 0 when |
| | | * the server starts. |
| | |
| | | * @throws ConfigException In case of invalid configuration. |
| | | */ |
| | | public LDAPReplicationDomain(ReplicationDomainCfg configuration, |
| | | BlockingQueue<UpdateToReplay> updateToReplayQueue) |
| | | throws ConfigException |
| | | BlockingQueue<UpdateToReplay> updateToReplayQueue) throws ConfigException |
| | | { |
| | | super(configuration.getBaseDN(), |
| | | configuration.getServerId(), |
| | | configuration.getInitializationWindowSize()); |
| | | |
| | | // Read the configuration parameters. |
| | | Set<String> replicationServers = configuration.getReplicationServer(); |
| | | |
| | | int window = configuration.getWindowSize(); |
| | | /** |
| | | * The time in milliseconds between heartbeats from the replication |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | | long heartbeatInterval = configuration.getHeartbeatInterval(); |
| | | |
| | | this.isolationPolicy = configuration.getIsolationPolicy(); |
| | | this.configDn = configuration.dn(); |
| | | this.logCSN = configuration.isLogChangenumber(); |
| | | this.config = configuration; |
| | | this.updateToReplayQueue = updateToReplayQueue; |
| | | this.histPurgeDelayInMilliSec = |
| | | configuration.getConflictsHistoricalPurgeDelay()*60*1000; |
| | | |
| | | // Get assured configuration |
| | | readAssuredConfig(configuration, false); |
| | |
| | | // register as an AlertGenerator |
| | | DirectoryServer.registerAlertGenerator(this); |
| | | |
| | | startPublishService(replicationServers, window, heartbeatInterval, |
| | | configuration.getChangetimeHeartbeatInterval()); |
| | | startPublishService(configuration); |
| | | } |
| | | |
| | | /** |
| | |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, msg); |
| | | } |
| | | |
| | | // FIXME should the next call use the initWindow parameter rather than the |
| | | // instance variable? |
| | | super.initializeRemote(target, requestorID, initTask, this.initWindow); |
| | | } |
| | | |
| | |
| | | */ |
| | | private boolean brokerIsConnected() |
| | | { |
| | | final IsolationPolicy isolationPolicy = config.getIsolationPolicy(); |
| | | if (isolationPolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES)) |
| | | { |
| | | // this policy imply that we always accept updates. |
| | |
| | | // Note that a failed non-replication operation might not have a change |
| | | // number. |
| | | CSN curCSN = OperationContext.getCSN(op); |
| | | if (curCSN != null && logCSN) |
| | | if (curCSN != null && config.isLogChangenumber()) |
| | | { |
| | | op.addAdditionalLogItem(AdditionalLogItem.unquotedKeyValue(getClass(), |
| | | "replicationCSN", curCSN)); |
| | |
| | | { |
| | | // If the base entry does not exist, save the generation |
| | | // ID in the config entry |
| | | result = runSaveGenerationId(configDn, generationId); |
| | | result = runSaveGenerationId(config.dn(), generationId); |
| | | } |
| | | |
| | | if (result != ResultCode.SUCCESS) |
| | |
| | | { |
| | | // if the base entry does not exist look for the generationID |
| | | // in the config entry. |
| | | search = conn.processSearch(configDn.toString(), |
| | | search = conn.processSearch(config.dn().toString(), |
| | | SearchScope.BASE_OBJECT, |
| | | DereferencePolicy.DEREF_ALWAYS, 0, 0, false, |
| | | filter,attributes); |
| | |
| | | public ConfigChangeResult applyConfigurationChange( |
| | | ReplicationDomainCfg configuration) |
| | | { |
| | | isolationPolicy = configuration.getIsolationPolicy(); |
| | | logCSN = configuration.isLogChangenumber(); |
| | | histPurgeDelayInMilliSec = |
| | | configuration.getConflictsHistoricalPurgeDelay()*60*1000; |
| | | |
| | | changeConfig( |
| | | configuration.getReplicationServer(), |
| | | configuration.getWindowSize(), |
| | | configuration.getHeartbeatInterval(), |
| | | (byte)configuration.getGroupId()); |
| | | this.config = configuration; |
| | | changeConfig(configuration); |
| | | |
| | | // Read assured + fractional configuration and each time reconnect if needed |
| | | readAssuredConfig(configuration, true); |
| | |
| | | @Override |
| | | public DN getComponentEntryDN() |
| | | { |
| | | return configDn; |
| | | return config.dn(); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | try |
| | | { |
| | | DN eclConfigEntryDN = DN.decode("cn=external changeLog," + configDn); |
| | | DN eclConfigEntryDN = DN.decode("cn=external changeLog," + config.dn()); |
| | | if (DirectoryServer.getConfigHandler().entryExists(eclConfigEntryDN)) |
| | | { |
| | | DirectoryServer.getConfigHandler().deleteEntry(eclConfigEntryDN, null); |
| | |
| | | // unit test cases |
| | | try |
| | | { |
| | | DN configDn = config.dn(); |
| | | if (DirectoryServer.getConfigHandler().entryExists(configDn)) |
| | | { |
| | | try |
| | |
| | | } |
| | | |
| | | /** |
| | | * Return the purge delay (in ms) for the historical information stored |
| | | * in entries to solve conflicts for this domain. |
| | | * Return the minimum time (in ms) that the domain keeps the historical |
| | | * information necessary to solve conflicts. |
| | | * |
| | | * @return the purge delay. |
| | | */ |
| | | public long getHistoricalPurgeDelay() |
| | | { |
| | | return histPurgeDelayInMilliSec; |
| | | return config.getConflictsHistoricalPurgeDelay() * 60 * 1000; |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry); |
| | | lastCSNPurgedFromHist = entryHist.getOldestCSN(); |
| | | entryHist.setPurgeDelay(this.histPurgeDelayInMilliSec); |
| | | entryHist.setPurgeDelay(getHistoricalPurgeDelay()); |
| | | Attribute attr = entryHist.encodeAndPurge(); |
| | | count += entryHist.getLastPurgedValuesCount(); |
| | | List<Modification> mods = new LinkedList<Modification>(); |