| | |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.common.StatusMachine.*; |
| | | |
| | | import java.io.BufferedOutputStream; |
| | | import java.io.IOException; |
| | | import java.io.InputStream; |
| | |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.ResultCode; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.common.StatusMachine.*; |
| | | |
| | | /** |
| | | * This class should be used as a base for Replication implementations. |
| | | * <p> |
| | |
| | | * to be able to correlate all the coming back acks to the original |
| | | * operation. |
| | | */ |
| | | private final Map<ChangeNumber, UpdateMsg> waitingAckMsgs = |
| | | new ConcurrentHashMap<ChangeNumber, UpdateMsg>(); |
| | | private final Map<CSN, UpdateMsg> waitingAckMsgs = |
| | | new ConcurrentHashMap<CSN, UpdateMsg>(); |
| | | |
| | | |
| | | /** |
| | |
| | | private final ServerState state; |
| | | |
| | | /** |
| | | * The generator that will be used to generate {@link ChangeNumber} |
| | | * The generator that will be used to generate {@link CSN} |
| | | * for this domain. |
| | | */ |
| | | private final ChangeNumberGenerator generator; |
| | | private final CSNGenerator generator; |
| | | |
| | | private final Object eclIncludesLock = new Object(); |
| | | private final Map<Integer, Set<String>> eclIncludesByServer = |
| | |
| | | private final Object sessionLock = new Object(); |
| | | |
| | | /** |
| | | * Returns the {@link ChangeNumberGenerator} that will be used to |
| | | * generate {@link ChangeNumber} for this domain. |
| | | * Returns the {@link CSNGenerator} that will be used to |
| | | * generate {@link CSN} for this domain. |
| | | * |
| | | * @return The {@link ChangeNumberGenerator} that will be used to |
| | | * generate {@link ChangeNumber} for this domain. |
| | | * @return The {@link CSNGenerator} that will be used to |
| | | * generate {@link CSN} for this domain. |
| | | */ |
| | | public ChangeNumberGenerator getGenerator() |
| | | public CSNGenerator getGenerator() |
| | | { |
| | | return generator; |
| | | } |
| | |
| | | this.serverID = serverID; |
| | | this.initWindow = initWindow; |
| | | this.state = new ServerState(); |
| | | this.generator = new ChangeNumberGenerator(serverID, state); |
| | | this.generator = new CSNGenerator(serverID, state); |
| | | |
| | | domains.put(baseDN, this); |
| | | } |
| | |
| | | this.baseDN = baseDN; |
| | | this.serverID = serverID; |
| | | this.state = serverState; |
| | | this.generator = new ChangeNumberGenerator(serverID, state); |
| | | this.generator = new CSNGenerator(serverID, state); |
| | | |
| | | domains.put(baseDN, this); |
| | | } |
| | |
| | | else if (msg instanceof UpdateMsg) |
| | | { |
| | | update = (UpdateMsg) msg; |
| | | generator.adjust(update.getChangeNumber()); |
| | | generator.adjust(update.getCSN()); |
| | | } |
| | | else if (msg instanceof InitializeRcvAckMsg) |
| | | { |
| | |
| | | */ |
| | | private void receiveAck(AckMsg ack) |
| | | { |
| | | UpdateMsg update; |
| | | ChangeNumber changeNumber = ack.getChangeNumber(); |
| | | CSN csn = ack.getCSN(); |
| | | |
| | | // Remove the message for pending ack list (this may already make the thread |
| | | // that is waiting for the ack be aware of its reception) |
| | | update = waitingAckMsgs.remove(changeNumber); |
| | | UpdateMsg update = waitingAckMsgs.remove(csn); |
| | | |
| | | // Signal waiting thread ack has been received |
| | | if (update != null) |
| | |
| | | is used the ack is sent without error at replay flag. |
| | | */ |
| | | processUpdateDone(msg, null); |
| | | state.update(msg.getChangeNumber()); |
| | | state.update(msg.getCSN()); |
| | | } |
| | | |
| | | /** |
| | |
| | | if (rsGroupId == groupId) |
| | | { |
| | | // Send the ack |
| | | AckMsg ackMsg = new AckMsg(msg.getChangeNumber()); |
| | | AckMsg ackMsg = new AckMsg(msg.getCSN()); |
| | | if (replayErrorMsg != null) |
| | | { |
| | | // Mark the error in the ack |
| | |
| | | msg.setSafeDataLevel(assuredSdLevel); |
| | | |
| | | // Add the assured message to the list of update that are waiting for acks |
| | | waitingAckMsgs.put(msg.getChangeNumber(), msg); |
| | | waitingAckMsgs.put(msg.getCSN(), msg); |
| | | } |
| | | } |
| | | |
| | |
| | | long startTime = System.currentTimeMillis(); |
| | | synchronized (msg) |
| | | { |
| | | ChangeNumber cn = msg.getChangeNumber(); |
| | | while (waitingAckMsgs.containsKey(cn)) |
| | | CSN csn = msg.getCSN(); |
| | | while (waitingAckMsgs.containsKey(csn)) |
| | | { |
| | | try |
| | | { |
| | |
| | | remove the update from the wait list, log the timeout error and |
| | | also update assured monitoring counters |
| | | */ |
| | | UpdateMsg update; |
| | | update = waitingAckMsgs.remove(cn); |
| | | UpdateMsg update = waitingAckMsgs.remove(csn); |
| | | |
| | | if (update != null) |
| | | { |
| | |
| | | // Should not happen |
| | | } |
| | | |
| | | throw new TimeoutException("No ack received for message cn: " + cn + |
| | | " and replication servceID: " + baseDN + " after " + |
| | | assuredTimeout + " ms."); |
| | | throw new TimeoutException("No ack received for message csn: " |
| | | + csn + " and replication servceID: " + baseDN + " after " |
| | | + assuredTimeout + " ms."); |
| | | } else |
| | | { |
| | | // Ack received just before timeout limit: we can exit |
| | |
| | | { |
| | | // Publish the update |
| | | broker.publish(msg); |
| | | state.update(msg.getChangeNumber()); |
| | | state.update(msg.getCSN()); |
| | | numSentUpdates.incrementAndGet(); |
| | | } |
| | | |
| | |
| | | UpdateMsg update; |
| | | synchronized (this) |
| | | { |
| | | update = new UpdateMsg(generator.newChangeNumber(), msg); |
| | | update = new UpdateMsg(generator.newCSN(), msg); |
| | | /* |
| | | If assured replication is configured, this will prepare blocking |
| | | mechanism. If assured replication is disabled, this returns |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Returns the ChangeNUmber of the last Change that was fully processed |
| | | * by this ReplicationDomain. |
| | | * Returns the CSN of the last Change that was fully processed by this |
| | | * ReplicationDomain. |
| | | * |
| | | * @return The ChangeNUmber of the last Change that was fully processed |
| | | * by this ReplicationDomain. |
| | | * @return The CSN of the last Change that was fully processed by this |
| | | * ReplicationDomain. |
| | | */ |
| | | public ChangeNumber getLastLocalChange() |
| | | public CSN getLastLocalChange() |
| | | { |
| | | return state.getChangeNumber(serverID); |
| | | return state.getCSN(serverID); |
| | | } |
| | | } |