| | |
| | | import java.util.concurrent.TimeoutException; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.messages.Category; |
| | |
| | | * is not updated too early. |
| | | */ |
| | | private final PendingChanges pendingChanges; |
| | | private final AtomicReference<RSUpdater> rsUpdater = |
| | | new AtomicReference<RSUpdater>(null); |
| | | |
| | | /** |
| | | * It contain the updates that were done on other servers, transmitted |
| | |
| | | * The thread that periodically saves the ServerState of this |
| | | * LDAPReplicationDomain in the database. |
| | | */ |
| | | private class ServerStateFlush extends DirectoryThread |
| | | private class ServerStateFlush extends DirectoryThread |
| | | { |
| | | protected ServerStateFlush() |
| | | { |
| | |
| | | { |
| | | done = false; |
| | | |
| | | while (!shutdown) |
| | | while (!isShutdownInitiated()) |
| | | { |
| | | try |
| | | { |
| | |
| | | catch (InterruptedException e) |
| | | { |
| | | // Thread interrupted: check for shutdown. |
| | | Thread.currentThread().interrupt(); |
| | | } |
| | | } |
| | | state.save(); |
| | |
| | | private class RSUpdater extends DirectoryThread |
| | | { |
| | | private final CSN startCSN; |
| | | /** |
| | | * Used to communicate that the current thread computation needs to |
| | | * shutdown. |
| | | */ |
| | | private AtomicBoolean shutdown = new AtomicBoolean(false); |
| | | |
| | | protected RSUpdater(CSN replServerMaxCSN) |
| | | { |
| | |
| | | { |
| | | // Replication server is missing some of our changes: let's |
| | | // send them to him. |
| | | Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get(); |
| | | logError(message); |
| | | logError(DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get()); |
| | | |
| | | /* |
| | | * Get all the changes that have not been seen by this |
| | |
| | | */ |
| | | try |
| | | { |
| | | if (buildAndPublishMissingChanges(startCSN, broker)) |
| | | if (buildAndPublishMissingChanges(startCSN, broker, shutdown)) |
| | | { |
| | | message = DEBUG_CHANGES_SENT.get(); |
| | | logError(message); |
| | | logError(DEBUG_CHANGES_SENT.get()); |
| | | synchronized(replayOperations) |
| | | { |
| | | replayOperations.clear(); |
| | |
| | | * Log an error for the repair tool |
| | | * that will need to re-synchronize the servers. |
| | | */ |
| | | message = ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString()); |
| | | logError(message); |
| | | logError(ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString())); |
| | | } |
| | | } catch (Exception e) |
| | | { |
| | |
| | | * Log an error for the repair tool |
| | | * that will need to re-synchronize the servers. |
| | | */ |
| | | message = ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString()); |
| | | logError(message); |
| | | logError(ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString())); |
| | | } |
| | | finally |
| | | { |
| | | broker.setRecoveryRequired(false); |
| | | // RSUpdater thread has finished its work, let's remove it from memory |
| | | // so another RSUpdater thread can be started if needed. |
| | | rsUpdater.compareAndSet(this, null); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void initiateShutdown() |
| | | { |
| | | this.shutdown.set(true); |
| | | super.initiateShutdown(); |
| | | } |
| | | } |
| | | |
| | | |
| | |
| | | if (!shutdown) |
| | | { |
| | | shutdown = true; |
| | | final RSUpdater rsUpdater = this.rsUpdater.get(); |
| | | if (rsUpdater != null) |
| | | { |
| | | rsUpdater.initiateShutdown(); |
| | | } |
| | | |
| | | // stop the thread in charge of flushing the ServerState. |
| | | if (flushThread != null) |
| | | { |
| | | flushThread.initiateShutdown(); |
| | | synchronized (flushThread) |
| | | { |
| | | flushThread.notify(); |
| | |
| | | { |
| | | pendingChanges.setRecovering(true); |
| | | broker.setRecoveryRequired(true); |
| | | new RSUpdater(replServerMaxCSN).start(); |
| | | final RSUpdater rsUpdater = new RSUpdater(replServerMaxCSN); |
| | | if (this.rsUpdater.compareAndSet(null, rsUpdater)) |
| | | { |
| | | rsUpdater.start(); |
| | | } |
| | | } |
| | | } |
| | | } catch (Exception e) |
| | |
| | | * The CSN where we need to start the search |
| | | * @param session |
| | | * The session to use to publish the changes |
| | | * @param shutdown |
| | | * whether the current run must be stopped |
| | | * @return A boolean indicating he success of the operation. |
| | | * @throws Exception |
| | | * if an Exception happens during the search. |
| | | */ |
| | | public boolean buildAndPublishMissingChanges(CSN startCSN, |
| | | ReplicationBroker session) throws Exception |
| | | ReplicationBroker session, AtomicBoolean shutdown) throws Exception |
| | | { |
| | | // Trim the changes in replayOperations that are older than the startCSN. |
| | | synchronized (replayOperations) |
| | |
| | | Iterator<CSN> it = replayOperations.keySet().iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | if (shutdown.get()) |
| | | { |
| | | return false; |
| | | } |
| | | if (it.next().isNewerThan(startCSN)) |
| | | { |
| | | break; |
| | |
| | | CSN currentStartCSN = startCSN; |
| | | do |
| | | { |
| | | if (shutdown.get()) |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | lastRetrievedChange = null; |
| | | // We can't do the search in one go because we need to store the results |
| | | // so that we are sure we send the operations in order and because the |
| | |
| | | |
| | | // Publish and remove all the changes from the replayOperations list |
| | | // that are older than the endCSN. |
| | | List<FakeOperation> opsToSend = new LinkedList<FakeOperation>(); |
| | | final List<FakeOperation> opsToSend = new LinkedList<FakeOperation>(); |
| | | synchronized (replayOperations) |
| | | { |
| | | Iterator<FakeOperation> itOp = replayOperations.values().iterator(); |
| | | while (itOp.hasNext()) |
| | | { |
| | | if (shutdown.get()) |
| | | { |
| | | return false; |
| | | } |
| | | FakeOperation fakeOp = itOp.next(); |
| | | if (fakeOp.getCSN().isNewerThan(endCSN) // sanity check |
| | | || !state.cover(fakeOp.getCSN())) |
| | | || !state.cover(fakeOp.getCSN()) |
| | | // do not look for replay operations in the future |
| | | || endCSN.isNewerThan(now())) |
| | | { |
| | | break; |
| | | } |
| | |
| | | |
| | | for (FakeOperation opToSend : opsToSend) |
| | | { |
| | | if (shutdown.get()) |
| | | { |
| | | return false; |
| | | } |
| | | session.publishRecovery(opToSend.generateMessage()); |
| | | } |
| | | opsToSend.clear(); |
| | | |
| | | if (lastRetrievedChange != null) |
| | | { |
| | | currentStartCSN = lastRetrievedChange; |
| | |
| | | { |
| | | currentStartCSN = endCSN; |
| | | } |
| | | |
| | | } while (pendingChanges.recoveryUntil(lastRetrievedChange) |
| | | && op.getResultCode().equals(ResultCode.SUCCESS)); |
| | | |
| | | return op.getResultCode().equals(ResultCode.SUCCESS); |
| | | } |
| | | |
| | | private static CSN now() |
| | | { |
| | | return new CSN(TimeThread.getTime(), 0, 0); |
| | | } |
| | | |
| | | /** |
| | | * Search for the changes that happened since fromCSN based on the historical |
| | |
| | | catch (InterruptedException e) |
| | | { |
| | | // Thread interrupted: check for shutdown. |
| | | Thread.currentThread().interrupt(); |
| | | } |
| | | } |
| | | |