| | |
| | | import java.util.SortedMap; |
| | | import java.util.TreeMap; |
| | | |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ChangeNumberGenerator; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.CSNGenerator; |
| | | import org.opends.server.replication.protocol.LDAPUpdateMsg; |
| | | import org.opends.server.replication.service.ReplicationDomain; |
| | | import org.opends.server.types.operation.PluginOperation; |
| | |
| | | * in progress and not yet committed in the database. |
| | | * |
| | | * It is used to make sure that operations are sent to the Replication |
| | | * Server in the order defined by their ChangeNumber. |
| | | * Server in the order defined by their CSN. |
| | | * It is also used to update the ServerState at the appropriate time. |
| | | * |
| | | * On object of this class is instantiated for each ReplicationDomain. |
| | |
| | | /** |
| | | * A map used to store the pending changes. |
| | | */ |
| | | private SortedMap<ChangeNumber, PendingChange> pendingChanges = |
| | | new TreeMap<ChangeNumber, PendingChange>(); |
| | | private SortedMap<CSN, PendingChange> pendingChanges = |
| | | new TreeMap<CSN, PendingChange>(); |
| | | |
| | | /** |
| | | * The ChangeNumberGenerator to use to create new unique ChangeNumbers |
| | | * The {@link CSNGenerator} to use to create new unique CSNs |
| | | * for each operation done on the replication domain. |
| | | */ |
| | | private ChangeNumberGenerator changeNumberGenerator; |
| | | private CSNGenerator csnGenerator; |
| | | |
| | | /** |
| | | * The ReplicationDomain that will be used to send UpdateMsg. |
| | |
| | | private boolean recoveringOldChanges = false; |
| | | |
| | | /** |
| | | * Creates a new PendingChanges using the provided ChangeNumberGenerator. |
| | | * Creates a new PendingChanges using the provided CSNGenerator. |
| | | * |
| | | * @param changeNumberGenerator The ChangeNumberGenerator to use to create |
| | | * new unique ChangeNumbers. |
| | | * @param domain The ReplicationDomain that will be used to send |
| | | * UpdateMsg. |
| | | * @param csnGenerator The CSNGenerator to use to create new unique CSNs. |
| | | * @param domain The ReplicationDomain that will be used to send UpdateMsg. |
| | | */ |
| | | public PendingChanges( |
| | | ChangeNumberGenerator changeNumberGenerator, ReplicationDomain domain) |
| | | CSNGenerator csnGenerator, ReplicationDomain domain) |
| | | { |
| | | this.changeNumberGenerator = changeNumberGenerator; |
| | | this.csnGenerator = csnGenerator; |
| | | this.domain = domain; |
| | | } |
| | | |
| | | /** |
| | | * Remove and return an update form the pending changes list. |
| | | * |
| | | * @param changeNumber The ChangeNumber of the update to remove. |
| | | * |
| | | * @param csn |
| | | * The CSN of the update to remove. |
| | | * @return The UpdateMsg that was just removed. |
| | | */ |
| | | public synchronized LDAPUpdateMsg remove(ChangeNumber changeNumber) |
| | | public synchronized LDAPUpdateMsg remove(CSN csn) |
| | | { |
| | | return pendingChanges.remove(changeNumber).getMsg(); |
| | | return pendingChanges.remove(csn).getMsg(); |
| | | } |
| | | |
| | | /** |
| | |
| | | /** |
| | | * Mark an update message as committed. |
| | | * |
| | | * @param changeNumber The ChangeNumber of the update message that must be |
| | | * set as committed. |
| | | * @param csn The CSN of the update message that must be set as committed. |
| | | * @param msg The message associated to the update. |
| | | */ |
| | | public synchronized void commit(ChangeNumber changeNumber, |
| | | LDAPUpdateMsg msg) |
| | | public synchronized void commit(CSN csn, LDAPUpdateMsg msg) |
| | | { |
| | | PendingChange curChange = pendingChanges.get(changeNumber); |
| | | PendingChange curChange = pendingChanges.get(csn); |
| | | if (curChange == null) |
| | | { |
| | | throw new NoSuchElementException(); |
| | |
| | | /** |
| | | * Mark an update message as committed. |
| | | * |
| | | * @param changeNumber The ChangeNumber of the update message that must be |
| | | * set as committed. |
| | | * @param csn The CSN of the update message that must be set as committed. |
| | | */ |
| | | public synchronized void commit(ChangeNumber changeNumber) |
| | | public synchronized void commit(CSN csn) |
| | | { |
| | | PendingChange curChange = pendingChanges.get(changeNumber); |
| | | PendingChange curChange = pendingChanges.get(csn); |
| | | if (curChange == null) |
| | | { |
| | | throw new NoSuchElementException(); |
| | |
| | | * |
| | | * @param operation The local operation for which an UpdateMsg must |
| | | * be added in the pending list. |
| | | * @return The ChangeNumber now associated to the operation. |
| | | * @return The CSN now associated to the operation. |
| | | */ |
| | | public synchronized ChangeNumber putLocalOperation(PluginOperation operation) |
| | | public synchronized CSN putLocalOperation(PluginOperation operation) |
| | | { |
| | | ChangeNumber changeNumber = changeNumberGenerator.newChangeNumber(); |
| | | PendingChange change = new PendingChange(changeNumber, operation, null); |
| | | pendingChanges.put(changeNumber, change); |
| | | return changeNumber; |
| | | CSN csn = csnGenerator.newCSN(); |
| | | PendingChange change = new PendingChange(csn, operation, null); |
| | | pendingChanges.put(csn, change); |
| | | return csn; |
| | | } |
| | | |
| | | /** |
| | |
| | | return numSentUpdates; |
| | | } |
| | | |
| | | // peek the oldest changeNumber |
| | | ChangeNumber firstChangeNumber = pendingChanges.firstKey(); |
| | | PendingChange firstChange = pendingChanges.get(firstChangeNumber); |
| | | // peek the oldest CSN |
| | | CSN firstCSN = pendingChanges.firstKey(); |
| | | PendingChange firstChange = pendingChanges.get(firstCSN); |
| | | |
| | | while (firstChange != null && firstChange.isCommitted()) |
| | | { |
| | |
| | | { |
| | | // do not push updates until the RS catches up. |
| | | // @see #setRecovering(boolean) |
| | | domain.getServerState().update(updateMsg.getChangeNumber()); |
| | | domain.getServerState().update(updateMsg.getCSN()); |
| | | } |
| | | } |
| | | pendingChanges.remove(firstChangeNumber); |
| | | pendingChanges.remove(firstCSN); |
| | | |
| | | if (pendingChanges.isEmpty()) |
| | | { |
| | |
| | | } |
| | | else |
| | | { |
| | | // peek the oldest changeNumber |
| | | firstChangeNumber = pendingChanges.firstKey(); |
| | | firstChange = pendingChanges.get(firstChangeNumber); |
| | | // peek the oldest CSN |
| | | firstCSN = pendingChanges.firstKey(); |
| | | firstChange = pendingChanges.get(firstCSN); |
| | | } |
| | | } |
| | | return numSentUpdates; |
| | |
| | | * in a single atomic operation. |
| | | * |
| | | * |
| | | * @param changeNumber The ChangeNumber of the update message that must be |
| | | * set as committed. |
| | | * @param csn The CSN of the update message that must be set as committed. |
| | | * @param msg The message associated to the update. |
| | | * |
| | | * @return The number of pushed updates. |
| | | */ |
| | | public synchronized int commitAndPushCommittedChanges( |
| | | ChangeNumber changeNumber, LDAPUpdateMsg msg) |
| | | public synchronized int commitAndPushCommittedChanges(CSN csn, |
| | | LDAPUpdateMsg msg) |
| | | { |
| | | commit(changeNumber, msg); |
| | | commit(csn, msg); |
| | | return pushCommittedChanges(); |
| | | } |
| | | |
| | | /** |
| | | * Set the PendingChangesList structure in a mode where it is |
| | | * waiting for the RS to receive all the previous changes to |
| | | * be sent before starting to process the changes normally. |
| | | * In this mode, The Domain does not publish the changes from |
| | | * the pendingChanges because there are older changes that |
| | | * need to be published before. |
| | | * Set the PendingChangesList structure in a mode where it is waiting for the |
| | | * RS to receive all the previous changes to be sent before starting to |
| | | * process the changes normally. In this mode, The Domain does not publish the |
| | | * changes from the pendingChanges because there are older changes that need |
| | | * to be published before. |
| | | * |
| | | * @param b The recovering status that must be set. |
| | | * @param recovering |
| | | * The recovering status that must be set. |
| | | */ |
| | | public void setRecovering(boolean b) |
| | | public void setRecovering(boolean recovering) |
| | | { |
| | | recoveringOldChanges = b; |
| | | recoveringOldChanges = recovering; |
| | | } |
| | | |
| | | /** |
| | | * Allows to update the recovery situation by comparing the ChangeNumber of |
| | | * the last change that was sent to the ReplicationServer with the |
| | | * ChangeNumber of the last operation that was taken out of the |
| | | * PendingChanges list. |
| | | * If he two match then the recovery is completed and normal procedure can |
| | | * restart. Otherwise the RSUpdate thread must continue to look for |
| | | * older changes and no changes can be committed from the pendingChanges list. |
| | | * Allows to update the recovery situation by comparing the CSN of the last |
| | | * change that was sent to the ReplicationServer with the CSN of the last |
| | | * operation that was taken out of the PendingChanges list. If the two match |
| | | * then the recovery is completed and normal procedure can restart. Otherwise |
| | | * the RSUpdate thread must continue to look for older changes and no changes |
| | | * can be committed from the pendingChanges list. |
| | | * |
| | | * @param recovered The ChangeNumber of the last change that was published |
| | | * to the ReplicationServer. |
| | | * |
| | | * @return A boolean indicating if the recovery is completed (false) |
| | | * or must continue (true). |
| | | * @param recovered |
| | | * The CSN of the last change that was published to the |
| | | * ReplicationServer. |
| | | * @return A boolean indicating if the recovery is completed (false) or must |
| | | * continue (true). |
| | | */ |
| | | public synchronized boolean recoveryUntil(ChangeNumber recovered) |
| | | public synchronized boolean recoveryUntil(CSN recovered) |
| | | { |
| | | ChangeNumber lastLocalChange = domain.getLastLocalChange(); |
| | | CSN lastLocalChange = domain.getLastLocalChange(); |
| | | if (recovered != null && recovered.newerOrEquals(lastLocalChange)) |
| | | { |
| | | recoveringOldChanges = false; |