| | |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | | * Thread responsible for inserting replicated changes into the ChangeNumber |
| | | * Index DB (CNIndexDB for short). Only changes older than the medium |
| | | * consistency point are inserted in the CNIndexDB. As a consequence this class |
| | | * is also responsible for maintaining the medium consistency point. |
| | | * Index DB (CNIndexDB for short). |
| | | * <p> |
| | | * Only changes older than the medium consistency point are inserted in the |
| | | * CNIndexDB. As a consequence this class is also responsible for maintaining |
| | | * the medium consistency point (indirectly through an |
| | | * {@code ECLMultiDomainDBCursor}). |
| | | */ |
| | | public class ChangeNumberIndexer extends DirectoryThread |
| | | { |
| | |
| | | /* |
| | | * The following MultiDomainServerState fields must be thread safe, because |
| | | * 1) initialization can happen while the replication server starts receiving |
| | | * updates 2) many updates can happen concurrently. |
| | | * updates |
| | | * 2) many updates can happen concurrently. |
| | | */ |
| | | /** |
| | | * Holds the cross domain medium consistency Replication Update Vector for the |
| | | * current replication server, also known as the previous cookie. |
| | | * <p> |
| | | * Stores the value of the cookie before the change currently processed is |
| | | * inserted in the DB. After insert, it is updated with the CSN of the change |
| | | * currently processed (thus becoming the "current" cookie just before the |
| | | * change is returned. |
| | | * <p> |
| | | * Note: This object is only updated by changes/updates. |
| | | * |
| | | * @see <a href= |
| | | * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names" |
| | | * >OpenDJ Domain Names - medium consistency RUV</a> |
| | | */ |
| | | private final MultiDomainServerState mediumConsistencyRUV = |
| | | new MultiDomainServerState(); |
| | | |
| | | /** |
| | | * Holds the last time each replica was seen alive, whether via updates or |
| | | * heartbeat notifications, or offline notifications. Data is held for each |
| | | * serverId cross domain. |
| | |
| | | * <p> |
| | | * Note: This object is updated by both heartbeats and changes/updates. |
| | | */ |
| | | private final MultiDomainServerState lastAliveCSNs = |
| | | new MultiDomainServerState(); |
| | | private final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState(); |
| | | |
| | | /** Note: This object is updated by replica offline messages. */ |
| | | private final MultiDomainServerState replicasOffline = |
| | | new MultiDomainServerState(); |
| | | private final MultiDomainServerState replicasOffline = new MultiDomainServerState(); |
| | | |
| | | /** |
| | | * Cursor across all the replicaDBs for all the replication domains. It is |
| | |
| | | } |
| | | |
| | | /** |
| | | * Restores in memory data needed to build the CNIndexDB, including the medium |
| | | * consistency point. |
| | | * Restores in memory data needed to build the CNIndexDB. In particular, |
| | | * initializes the changes cursor to the medium consistency point. |
| | | */ |
| | | private void initialize() throws ChangelogException, DirectoryException |
| | | { |
| | | final ChangeNumberIndexRecord newestRecord = |
| | | changelogDB.getChangeNumberIndexDB().getNewestRecord(); |
| | | final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); |
| | | |
| | | initializeLastAliveCSNs(domainDB); |
| | | initializeNextChangeCursor(domainDB); |
| | | initializeOfflineReplicas(); |
| | | |
| | | // this will not be used any more. Discard for garbage collection. |
| | | this.changelogState = null; |
| | | } |
| | | |
| | | private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException |
| | | { |
| | | final MultiDomainServerState cookieWithNewestCSN = getCookieInitializedWithNewestCSN(); |
| | | |
| | | MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint = |
| | | domainDB.getCursorFrom(cookieWithNewestCSN, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | |
| | | nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint); |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | |
| | | /** Returns a cookie initialised with the newest CSN for each replica. */ |
| | | private MultiDomainServerState getCookieInitializedWithNewestCSN() throws ChangelogException |
| | | { |
| | | final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord(); |
| | | final MultiDomainServerState cookieWithNewestCSN = new MultiDomainServerState(); |
| | | if (newestRecord != null) |
| | | { |
| | | // restore the mediumConsistencyRUV from DB |
| | | mediumConsistencyRUV.update( |
| | | new MultiDomainServerState(newestRecord.getPreviousCookie())); |
| | | // Do not update with the newestRecord CSN |
| | | // as it will be used for a sanity check later in the same method |
| | | final CSN newestCsn = newestRecord.getCSN(); |
| | | for (DN baseDN : changelogState.getDomainToServerIds().keySet()) |
| | | { |
| | | cookieWithNewestCSN.update(baseDN, newestCsn); |
| | | } |
| | | } |
| | | return cookieWithNewestCSN; |
| | | } |
| | | |
| | | // initialize the DB cursor and the last seen updates |
| | | // to ensure the medium consistency CSN can move forward |
| | | final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); |
| | | private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB) |
| | | { |
| | | for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | { |
| | | final DN baseDN = entry.getKey(); |
| | |
| | | lastAliveCSNs.update(baseDN, latestKnownState); |
| | | } |
| | | } |
| | | } |
| | | |
| | | nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, |
| | | domainDB.getCursorFrom(mediumConsistencyRUV, AFTER_MATCHING_KEY)); |
| | | nextChangeForInsertDBCursor.next(); |
| | | |
| | | if (newestRecord != null) |
| | | { |
| | | // restore the "previousCookie" state before shutdown |
| | | UpdateMsg record = nextChangeForInsertDBCursor.getRecord(); |
| | | if (record instanceof ReplicaOfflineMsg) |
| | | { |
| | | // ignore: replica offline messages are never stored in the CNIndexDB |
| | | nextChangeForInsertDBCursor.next(); |
| | | record = nextChangeForInsertDBCursor.getRecord(); |
| | | } |
| | | |
| | | // sanity check: ensure that when initializing the cursors at the previous |
| | | // cookie, the next change we find is the newest record in the CNIndexDB |
| | | if (!record.getCSN().equals(newestRecord.getCSN())) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get( |
| | | newestRecord.getCSN().toStringUI(), record.getCSN().toStringUI())); |
| | | } |
| | | // Now we can update the mediumConsistencyRUV |
| | | mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN()); |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | |
| | | private void initializeOfflineReplicas() |
| | | { |
| | | final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas(); |
| | | for (DN baseDN : offlineReplicas) |
| | | { |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | // this will not be used any more. Discard for garbage collection. |
| | | this.changelogState = null; |
| | | } |
| | | |
| | | private CSN oldestPossibleCSN(int serverId) |
| | |
| | | |
| | | // OK, the oldest change is older than the medium consistency point |
| | | // let's publish it to the CNIndexDB. |
| | | final String previousCookie = mediumConsistencyRUV.toString(); |
| | | final long changeNumber = changelogDB.getChangeNumberIndexDB().addRecord( |
| | | new ChangeNumberIndexRecord(previousCookie, baseDN, csn)); |
| | | notifyEntryAddedToChangelog(baseDN, changeNumber, previousCookie, msg); |
| | | final long changeNumber = changelogDB.getChangeNumberIndexDB() |
| | | .addRecord(new ChangeNumberIndexRecord(baseDN, csn)); |
| | | MultiDomainServerState cookie = nextChangeForInsertDBCursor.toCookie(); |
| | | notifyEntryAddedToChangelog(baseDN, changeNumber, cookie, msg); |
| | | moveForwardMediumConsistencyPoint(csn, baseDN); |
| | | } |
| | | catch (InterruptedException ignored) |
| | |
| | | * the change number of the newly added entry. It will be greater |
| | | * than zero for entries added to the change number index and less |
| | | * than or equal to zero for entries added to any replica DB |
| | | * @param cookieString |
| | | * a string representing the cookie of the newly added entry. This is |
| | | * only meaningful for entries added to the change number index |
| | | * @param cookie |
| | | * the cookie of the newly added entry. This is only meaningful for |
| | | * entries added to the change number index |
| | | * @param msg |
| | | * the update message of the newly added entry |
| | | * @throws ChangelogException |
| | | * If a problem occurs while notifying of the newly added entry. |
| | | */ |
| | | protected void notifyEntryAddedToChangelog(DN baseDN, long changeNumber, |
| | | String cookieString, UpdateMsg msg) throws ChangelogException |
| | | MultiDomainServerState cookie, UpdateMsg msg) throws ChangelogException |
| | | { |
| | | ChangelogBackend.getInstance().notifyChangeNumberEntryAdded(baseDN, changeNumber, cookieString, msg); |
| | | ChangelogBackend.getInstance().notifyChangeNumberEntryAdded(baseDN, changeNumber, cookie.toString(), msg); |
| | | } |
| | | |
| | | /** |
| | |
| | | TRACER.debugError(msg.toString()); |
| | | } |
| | | |
| | | private void moveForwardMediumConsistencyPoint(final CSN mcCSN, |
| | | final DN mcBaseDN) throws ChangelogException |
| | | private void moveForwardMediumConsistencyPoint(final CSN mcCSN, final DN mcBaseDN) throws ChangelogException |
| | | { |
| | | // update, so it becomes the previous cookie for the next change |
| | | mediumConsistencyRUV.update(mcBaseDN, mcCSN); |
| | | |
| | | final int mcServerId = mcCSN.getServerId(); |
| | | final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId); |
| | | final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId); |
| | |
| | | * from the medium consistency RUV). |
| | | */ |
| | | lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); |
| | | } |
| | | } |
| | | |