| | |
| | | * |
| | | * |
| | | * Copyright 2009 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011 ForgeRock AS |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | |
| | | import java.util.NoSuchElementException; |
| | | import java.util.SortedMap; |
| | | import java.util.TreeMap; |
| | | |
| | | import org.opends.server.replication.service.ReplicationDomain; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ChangeNumberGenerator; |
| | | import org.opends.server.replication.protocol.LDAPUpdateMsg; |
| | | import org.opends.server.replication.service.ReplicationDomain; |
| | | import org.opends.server.types.operation.PluginOperation; |
| | | |
| | | /** |
| | |
| | | */ |
| | | private ReplicationDomain domain; |
| | | |
| | | private boolean recoveringOldChanges = false; |
| | | |
| | | /** |
| | | * Creates a new PendingChanges using the provided ChangeNumberGenerator. |
| | | * |
| | |
| | | public synchronized void commit(ChangeNumber changeNumber, |
| | | LDAPUpdateMsg msg) |
| | | { |
| | | _commit(changeNumber, msg); |
| | | } |
| | | /** |
| | | * Mark an update message as committed. |
| | | * |
| | | * @param changeNumber The ChangeNumber of the update message that must be |
| | | * set as committed. |
| | | * @param msg The message associated to the update. |
| | | */ |
| | | public void _commit(ChangeNumber changeNumber, |
| | | LDAPUpdateMsg msg) |
| | | { |
| | | PendingChange curChange = pendingChanges.get(changeNumber); |
| | | if (curChange == null) |
| | | { |
| | | throw new NoSuchElementException(); |
| | | } |
| | | curChange.setCommitted(true); |
| | | |
| | | curChange.setMsg(msg); |
| | | } |
| | | |
| | |
| | | */ |
| | | public synchronized ChangeNumber putLocalOperation(PluginOperation operation) |
| | | { |
| | | return _putLocalOperation(operation); |
| | | } |
| | | /** |
| | | * Add a new UpdateMsg to the pending list from the provided local |
| | | * operation. |
| | | * |
| | | * @param operation The local operation for which an UpdateMsg must |
| | | * be added in the pending list. |
| | | * @return The ChangeNumber now associated to the operation. |
| | | */ |
| | | public ChangeNumber _putLocalOperation(PluginOperation operation) |
| | | { |
| | | ChangeNumber changeNumber; |
| | | |
| | | changeNumber = changeNumberGenerator.newChangeNumber(); |
| | | ChangeNumber changeNumber = changeNumberGenerator.newChangeNumber(); |
| | | PendingChange change = new PendingChange(changeNumber, operation, null); |
| | | pendingChanges.put(changeNumber, change); |
| | | return changeNumber; |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public synchronized int pushCommittedChanges() |
| | | { |
| | | return _pushCommittedChanges(); |
| | | } |
| | | /** |
| | | * Push all committed local changes to the replicationServer service. |
| | | * |
| | | * @return The number of pushed updates. |
| | | */ |
| | | public int _pushCommittedChanges() |
| | | { |
| | | int numSentUpdates = 0; |
| | | if (pendingChanges.isEmpty()) |
| | | { |
| | | return numSentUpdates; |
| | | } |
| | | |
| | | ChangeNumber firstChangeNumber = pendingChanges.firstKey(); |
| | | PendingChange firstChange = pendingChanges.get(firstChangeNumber); |
| | | |
| | | while ((firstChange != null) && firstChange.isCommitted()) |
| | | while (firstChange != null && firstChange.isCommitted()) |
| | | { |
| | | if ((firstChange.getOp() != null ) && |
| | | (firstChange.getOp().isSynchronizationOperation() == false)) |
| | | if (firstChange.getOp() != null |
| | | && !firstChange.getOp().isSynchronizationOperation()) |
| | | { |
| | | numSentUpdates++; |
| | | LDAPUpdateMsg updateMsg = firstChange.getMsg(); |
| | |
| | | * @return The number of pushed updates. |
| | | */ |
| | | public synchronized int commitAndPushCommittedChanges( |
| | | ChangeNumber changeNumber, |
| | | LDAPUpdateMsg msg) |
| | | ChangeNumber changeNumber, LDAPUpdateMsg msg) |
| | | { |
| | | _commit(changeNumber, msg); |
| | | return _pushCommittedChanges(); |
| | | commit(changeNumber, msg); |
| | | return pushCommittedChanges(); |
| | | } |
| | | |
| | | private boolean recoveringOldChanges = false; |
| | | /** |
| | | * Set the PendingChangesList structure in a mode where it is |
| | | * waiting for the RS to receive all the previous changes to |
| | |
| | | * @return A boolean indicating if the recovery is completed (false) |
| | | * or must continue (true). |
| | | */ |
| | | |
| | | public synchronized boolean recoveryUntil(ChangeNumber recovered) |
| | | { |
| | | ChangeNumber lastLocalChange = domain.getLastLocalChange(); |
| | | |
| | | if ((recovered != null) && (recovered.newerOrEquals(lastLocalChange))) |
| | | if (recovered != null && recovered.newerOrEquals(lastLocalChange)) |
| | | { |
| | | recoveringOldChanges = false; |
| | | } |