| | |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | |
| | | import java.util.NoSuchElementException; |
| | | import java.util.SortedMap; |
| | | import java.util.TreeMap; |
| | | |
| | | import java.util.concurrent.TimeoutException; |
| | | import org.opends.messages.Message; |
| | | import org.opends.server.replication.service.ReplicationDomain; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ChangeNumberGenerator; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.protocol.LDAPUpdateMsg; |
| | | import org.opends.server.types.operation.PluginOperation; |
| | | |
| | | /** |
| | |
| | | private ChangeNumberGenerator changeNumberGenerator; |
| | | |
| | | /** |
| | | * The Replicationbroker that will be used to send UpdateMsg. |
| | | * The ReplicationDomain that will be used to send UpdateMsg. |
| | | */ |
| | | private ReplicationBroker broker; |
| | | |
| | | /** |
| | | * The ServerState that will be updated when UpdateMsg are committed. |
| | | */ |
| | | private ServerState state; |
| | | private ReplicationDomain domain; |
| | | |
| | | /** |
| | | * Creates a new PendingChanges using the provided ChangeNumberGenerator. |
| | | * |
| | | * @param changeNumberGenerator The ChangeNumberGenerator to use to create |
| | | * new unique ChangeNumbers. |
| | | * @param broker The Replicationbroker that will be used to send |
| | | * @param domain The ReplicationDomain that will be used to send |
| | | * UpdateMsg. |
| | | * @param state The ServerState that will be updated when UpdateMsg |
| | | * are committed. |
| | | */ |
| | | public PendingChanges( |
| | | ChangeNumberGenerator changeNumberGenerator, ReplicationBroker broker, |
| | | ServerState state) |
| | | ChangeNumberGenerator changeNumberGenerator, ReplicationDomain domain) |
| | | { |
| | | this.changeNumberGenerator = changeNumberGenerator; |
| | | this.broker = broker; |
| | | this.state = state; |
| | | this.domain = domain; |
| | | } |
| | | |
| | | /** |
| | |
| | | * |
| | | * @return The UpdateMsg that was just removed. |
| | | */ |
| | | public synchronized UpdateMsg remove(ChangeNumber changeNumber) |
| | | public synchronized LDAPUpdateMsg remove(ChangeNumber changeNumber) |
| | | { |
| | | return pendingChanges.remove(changeNumber).getMsg(); |
| | | } |
| | |
| | | * @param msg The message associated to the update. |
| | | */ |
| | | public synchronized void commit(ChangeNumber changeNumber, |
| | | UpdateMsg msg) |
| | | LDAPUpdateMsg msg) |
| | | { |
| | | PendingChange curChange = pendingChanges.get(changeNumber); |
| | | if (curChange == null) |
| | |
| | | (firstChange.getOp().isSynchronizationOperation() == false)) |
| | | { |
| | | numSentUpdates++; |
| | | broker.publish(firstChange.getMsg()); |
| | | } |
| | | state.update(firstChangeNumber); |
| | | LDAPUpdateMsg updateMsg = firstChange.getMsg(); |
| | | try |
| | | { |
| | | domain.publish(updateMsg); |
| | | } catch (TimeoutException ex) { |
| | | // This exception may only be raised if assured replication is |
| | | // enabled |
| | | Message errorMsg = ERR_DS_ACK_TIMEOUT.get( |
| | | domain.getServiceID(), Long.toString(domain.getAssuredTimeout()), |
| | | updateMsg.toString()); |
| | | logError(errorMsg); |
| | | } |
| | | } |
| | | pendingChanges.remove(firstChangeNumber); |
| | | |
| | | if (pendingChanges.isEmpty()) |