| | |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.util.Map.Entry; |
| | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.ReplicaOfflineMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | |
| | |
| | | private final ChangelogDB changelogDB; |
| | | /** Only used for initialization, and then discarded. */ |
| | | private ChangelogState changelogState; |
| | | private final ECLEnabledDomainPredicate predicate; |
| | | |
| | | /* |
| | | * The following MultiDomainServerState fields must be thread safe, because |
| | |
| | | * |
| | | * @NonNull |
| | | */ |
| | | private MultiDomainDBCursor nextChangeForInsertDBCursor; |
| | | private ECLMultiDomainDBCursor nextChangeForInsertDBCursor; |
| | | |
| | | /** |
| | | * Builds a ChangeNumberIndexer object. |
| | |
| | | */ |
| | | public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState) |
| | | { |
| | | this(changelogDB, changelogState, new ECLEnabledDomainPredicate()); |
| | | } |
| | | |
| | | /** |
| | | * Builds a ChangeNumberIndexer object. |
| | | * |
| | | * @param changelogDB |
| | | * the changelogDB |
| | | * @param changelogState |
| | | * the changelog state used for initialization |
| | | * @param predicate |
| | | * tells whether a domain is enabled for the external changelog |
| | | */ |
| | | ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState, |
| | | ECLEnabledDomainPredicate predicate) |
| | | { |
| | | super("Change number indexer"); |
| | | this.changelogDB = changelogDB; |
| | | this.changelogState = changelogState; |
| | | this.predicate = predicate; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void publishHeartbeat(DN baseDN, CSN heartbeatCSN) |
| | | { |
| | | if (!isECLEnabledDomain(baseDN)) |
| | | if (!predicate.isECLEnabledDomain(baseDN)) |
| | | { |
| | | return; |
| | | } |
| | |
| | | public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg) |
| | | throws ChangelogException |
| | | { |
| | | if (!isECLEnabledDomain(baseDN)) |
| | | if (!predicate.isECLEnabledDomain(baseDN)) |
| | | { |
| | | return; |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns whether the provided baseDN represents a replication domain enabled |
| | | * for the external changelog. |
| | | * <p> |
| | | * This method is a test seam that break the dependency on a static method. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain to check |
| | | * @return true if the provided baseDN is enabled for the external changelog, |
| | | * false if the provided baseDN is disabled for the external changelog |
| | | * or unknown to multimaster replication. |
| | | * @see MultimasterReplication#isECLEnabledDomain(DN) |
| | | */ |
| | | protected boolean isECLEnabledDomain(DN baseDN) |
| | | { |
| | | return MultimasterReplication.isECLEnabledDomain(baseDN); |
| | | } |
| | | |
| | | /** |
| | | * Signals a replica went offline. |
| | | * |
| | | * @param baseDN |
| | |
| | | */ |
| | | public void replicaOffline(DN baseDN, CSN offlineCSN) |
| | | { |
| | | if (!isECLEnabledDomain(baseDN)) |
| | | if (!predicate.isECLEnabledDomain(baseDN)) |
| | | { |
| | | return; |
| | | } |
| | |
| | | for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | { |
| | | final DN baseDN = entry.getKey(); |
| | | if (isECLEnabledDomain(baseDN)) |
| | | if (predicate.isECLEnabledDomain(baseDN)) |
| | | { |
| | | for (Integer serverId : entry.getValue()) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV, PositionStrategy.AFTER_MATCHING_KEY); |
| | | nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, |
| | | domainDB.getCursorFrom(mediumConsistencyRUV, AFTER_MATCHING_KEY)); |
| | | nextChangeForInsertDBCursor.next(); |
| | | |
| | | if (newestRecord != null) |
| | |
| | | { |
| | | for (CSN offlineCSN : offlineReplicas.getServerState(baseDN)) |
| | | { |
| | | if (isECLEnabledDomain(baseDN)) |
| | | if (predicate.isECLEnabledDomain(baseDN)) |
| | | { |
| | | replicasOffline.update(baseDN, offlineCSN); |
| | | // a replica offline message could also be the very last time |