| | |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import java.util.Map.Entry; |
| | | import java.util.NoSuchElementException; |
| | | import java.util.SortedMap; |
| | | import java.util.TreeMap; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | |
| | | /** |
| | | * A map used to store the pending changes. |
| | | */ |
| | | private SortedMap<CSN, PendingChange> pendingChanges = |
| | | new TreeMap<CSN, PendingChange>(); |
| | | private final TreeMap<CSN, PendingChange> pendingChanges = |
| | | new TreeMap<CSN, PendingChange>(); |
| | | |
| | | /** |
| | | * The {@link CSNGenerator} to use to create new unique CSNs |
| | | * for each operation done on the replication domain. |
| | | */ |
| | | private CSNGenerator csnGenerator; |
| | | private final CSNGenerator csnGenerator; |
| | | |
| | | /** |
| | | * The ReplicationDomain that will be used to send UpdateMsg. |
| | | */ |
| | | private ReplicationDomain domain; |
| | | private final ReplicationDomain domain; |
| | | |
| | | private boolean recoveringOldChanges = false; |
| | | |
| | |
| | | synchronized CSN putLocalOperation(PluginOperation operation) |
| | | { |
| | | final CSN csn = csnGenerator.newCSN(); |
| | | final PendingChange change = new PendingChange(csn, operation, null); |
| | | pendingChanges.put(csn, change); |
| | | if (!operation.isSynchronizationOperation()) |
| | | { |
| | | pendingChanges.put(csn, new PendingChange(csn, operation, null)); |
| | | } |
| | | return csn; |
| | | } |
| | | |
| | | /** |
| | | * Push all committed local changes to the replicationServer service. |
| | | * |
| | | * @return The number of pushed updates. |
| | | */ |
| | | synchronized int pushCommittedChanges() |
| | | synchronized void pushCommittedChanges() |
| | | { |
| | | int numSentUpdates = 0; |
| | | if (pendingChanges.isEmpty()) |
| | | // peek the oldest change |
| | | Entry<CSN, PendingChange> firstEntry = pendingChanges.firstEntry(); |
| | | if (firstEntry == null) |
| | | { |
| | | return numSentUpdates; |
| | | return; |
| | | } |
| | | |
| | | // peek the oldest CSN |
| | | CSN firstCSN = pendingChanges.firstKey(); |
| | | PendingChange firstChange = pendingChanges.get(firstCSN); |
| | | PendingChange firstChange = firstEntry.getValue(); |
| | | |
| | | while (firstChange != null && firstChange.isCommitted()) |
| | | { |
| | | final PluginOperation op = firstChange.getOp(); |
| | | if (op != null && !op.isSynchronizationOperation()) |
| | | { |
| | | numSentUpdates++; |
| | | final LDAPUpdateMsg updateMsg = firstChange.getMsg(); |
| | | if (!recoveringOldChanges) |
| | | { |
| | |
| | | domain.getServerState().update(updateMsg.getCSN()); |
| | | } |
| | | } |
| | | pendingChanges.remove(firstCSN); |
| | | |
| | | if (pendingChanges.isEmpty()) |
| | | { |
| | | firstChange = null; |
| | | } |
| | | else |
| | | { |
| | | // peek the oldest CSN |
| | | firstCSN = pendingChanges.firstKey(); |
| | | firstChange = pendingChanges.get(firstCSN); |
| | | } |
| | | // false warning: firstEntry will not be null if firstChange is not null |
| | | pendingChanges.remove(firstEntry.getKey()); |
| | | |
| | | // peek the oldest change |
| | | firstEntry = pendingChanges.firstEntry(); |
| | | firstChange = firstEntry != null ? firstEntry.getValue() : null; |
| | | } |
| | | return numSentUpdates; |
| | | } |
| | | |
| | | /** |
| | |
| | | * push all committed local changes to the replicationServer service |
| | | * in a single atomic operation. |
| | | * |
| | | * |
| | | * @param csn The CSN of the update message that must be set as committed. |
| | | * @param msg The message associated to the update. |
| | | * |
| | | * @return The number of pushed updates. |
| | | * @param msg The message associated to the update. |
| | | */ |
| | | synchronized int commitAndPushCommittedChanges(CSN csn, LDAPUpdateMsg msg) |
| | | synchronized void commitAndPushCommittedChanges(CSN csn, LDAPUpdateMsg msg) |
| | | { |
| | | commit(csn, msg); |
| | | return pushCommittedChanges(); |
| | | pushCommittedChanges(); |
| | | } |
| | | |
| | | /** |