| | |
| | | * |
| | | * |
| | | * Copyright 2009 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | * Portions Copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | |
| | | * |
| | | * On object of this class is instantiated for each ReplicationDomain. |
| | | */ |
| | | public class PendingChanges |
| | | class PendingChanges |
| | | { |
| | | /** |
| | | * A map used to store the pending changes. |
| | |
| | | * @param csnGenerator The CSNGenerator to use to create new unique CSNs. |
| | | * @param domain The ReplicationDomain that will be used to send UpdateMsg. |
| | | */ |
| | | public PendingChanges( |
| | | CSNGenerator csnGenerator, ReplicationDomain domain) |
| | | PendingChanges(CSNGenerator csnGenerator, ReplicationDomain domain) |
| | | { |
| | | this.csnGenerator = csnGenerator; |
| | | this.domain = domain; |
| | |
| | | * |
| | | * @param csn |
| | | * The CSN of the update to remove. |
| | | * @return The UpdateMsg that was just removed. |
| | | */ |
| | | public synchronized LDAPUpdateMsg remove(CSN csn) |
| | | synchronized void remove(CSN csn) |
| | | { |
| | | return pendingChanges.remove(csn).getMsg(); |
| | | pendingChanges.remove(csn); |
| | | } |
| | | |
| | | /** |
| | |
| | | * |
| | | * @return The number of update currently in the list. |
| | | */ |
| | | public int size() |
| | | int size() |
| | | { |
| | | return pendingChanges.size(); |
| | | } |
| | |
| | | * @param csn The CSN of the update message that must be set as committed. |
| | | * @param msg The message associated to the update. |
| | | */ |
| | | public synchronized void commit(CSN csn, LDAPUpdateMsg msg) |
| | | private synchronized void commit(CSN csn, LDAPUpdateMsg msg) |
| | | { |
| | | PendingChange curChange = pendingChanges.get(csn); |
| | | final PendingChange curChange = pendingChanges.get(csn); |
| | | if (curChange == null) |
| | | { |
| | | throw new NoSuchElementException(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Mark an update message as committed. |
| | | * |
| | | * @param csn The CSN of the update message that must be set as committed. |
| | | */ |
| | | public synchronized void commit(CSN csn) |
| | | { |
| | | PendingChange curChange = pendingChanges.get(csn); |
| | | if (curChange == null) |
| | | { |
| | | throw new NoSuchElementException(); |
| | | } |
| | | curChange.setCommitted(true); |
| | | } |
| | | |
| | | /** |
| | | * Add a new UpdateMsg to the pending list from the provided local |
| | | * operation. |
| | | * |
| | |
| | | * be added in the pending list. |
| | | * @return The CSN now associated to the operation. |
| | | */ |
| | | public synchronized CSN putLocalOperation(PluginOperation operation) |
| | | synchronized CSN putLocalOperation(PluginOperation operation) |
| | | { |
| | | CSN csn = csnGenerator.newCSN(); |
| | | PendingChange change = new PendingChange(csn, operation, null); |
| | | final CSN csn = csnGenerator.newCSN(); |
| | | final PendingChange change = new PendingChange(csn, operation, null); |
| | | pendingChanges.put(csn, change); |
| | | return csn; |
| | | } |
| | |
| | | * |
| | | * @return The number of pushed updates. |
| | | */ |
| | | public synchronized int pushCommittedChanges() |
| | | synchronized int pushCommittedChanges() |
| | | { |
| | | int numSentUpdates = 0; |
| | | if (pendingChanges.isEmpty()) |
| | |
| | | |
| | | while (firstChange != null && firstChange.isCommitted()) |
| | | { |
| | | if (firstChange.getOp() != null |
| | | && !firstChange.getOp().isSynchronizationOperation()) |
| | | final PluginOperation op = firstChange.getOp(); |
| | | if (op != null && !op.isSynchronizationOperation()) |
| | | { |
| | | numSentUpdates++; |
| | | LDAPUpdateMsg updateMsg = firstChange.getMsg(); |
| | | final LDAPUpdateMsg updateMsg = firstChange.getMsg(); |
| | | if (!recoveringOldChanges) |
| | | { |
| | | domain.publish(updateMsg); |
| | |
| | | * |
| | | * @return The number of pushed updates. |
| | | */ |
| | | public synchronized int commitAndPushCommittedChanges(CSN csn, |
| | | LDAPUpdateMsg msg) |
| | | synchronized int commitAndPushCommittedChanges(CSN csn, LDAPUpdateMsg msg) |
| | | { |
| | | commit(csn, msg); |
| | | return pushCommittedChanges(); |
| | |
| | | * @return A boolean indicating if the recovery is completed (false) or must |
| | | * continue (true). |
| | | */ |
| | | public synchronized boolean recoveryUntil(CSN recovered) |
| | | synchronized boolean recoveryUntil(CSN recovered) |
| | | { |
| | | CSN lastLocalChange = domain.getLastLocalChange(); |
| | | final CSN lastLocalChange = domain.getLastLocalChange(); |
| | | if (recovered != null && recovered.isNewerThanOrEqualTo(lastLocalChange)) |
| | | { |
| | | recoveringOldChanges = false; |