| | |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.TimeoutException; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | |
| | | import org.opends.messages.Category; |
| | |
| | | * and which can start receiving updates. |
| | | * <p> |
| | | * When updates are received the Replication Service calls the |
| | | * {@link #processUpdate(UpdateMsg, AtomicBoolean)} method. |
| | | * {@link #processUpdate(UpdateMsg)} method. |
| | | * ReplicationDomain implementation should implement the appropriate code |
| | | * for replaying the update on the local repository. |
| | | * When fully done the subclass must call the |
| | |
| | | * them to the global incoming update message queue for later processing by |
| | | * replay threads. |
| | | */ |
| | | private ListenerThread listenerThread; |
| | | private volatile DirectoryThread listenerThread = null; |
| | | |
| | | /** |
| | | * A Map used to store all the ReplicationDomains created on this server. |
| | |
| | | * Also responsible for updating the list of pending changes |
| | | * @return the received message - null if none |
| | | */ |
| | | UpdateMsg receive() |
| | | private UpdateMsg receive() |
| | | { |
| | | UpdateMsg update = null; |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * This method is called when the ReplicationDomain has completed the |
| | | * processing of a received update synchronously. |
| | | * In such cases the processUpdateDone () is called and the state |
| | | * is updated automatically. |
| | | * |
| | | * @param msg The UpdateMessage that was processed. |
| | | */ |
| | | void processUpdateDoneSynchronous(UpdateMsg msg) |
| | | { |
| | | /* |
| | | Warning: in synchronous mode, no way to tell the replay of an update went |
| | | wrong Just put null in processUpdateDone so that if assured replication |
| | | is used the ack is sent without error at replay flag. |
| | | */ |
| | | processUpdateDone(msg, null); |
| | | state.update(msg.getCSN()); |
| | | } |
| | | |
| | | /** |
| | | * Check if the domain is connected to a ReplicationServer. |
| | | * |
| | | * @return true if the server is connected, false if not. |
| | |
| | | * Starts the receiver side of the Replication Service. |
| | | * <p> |
| | | * After this method has been called, the Replication Service will start |
| | | * calling the {@link #processUpdate(UpdateMsg, AtomicBoolean)}. |
| | | * calling the {@link #processUpdate(UpdateMsg)}. |
| | | * <p> |
| | | * This method must be called once and must be called after the |
| | | * {@link #startPublishService(ReplicationDomainCfg)}. |
| | |
| | | { |
| | | synchronized (sessionLock) |
| | | { |
| | | // Create the listener thread |
| | | listenerThread = new ListenerThread(this); |
| | | final String threadName = "Replica DS(" + getServerId() |
| | | + ") listener for domain \"" + getBaseDNString() + "\""; |
| | | |
| | | listenerThread = new DirectoryThread(new Runnable() |
| | | { |
| | | @Override |
| | | public void run() |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Replication Listener thread starting."); |
| | | } |
| | | |
| | | // Loop processing any incoming update messages. |
| | | while (!listenerThread.isShutdownInitiated()) |
| | | { |
| | | final UpdateMsg updateMsg = receive(); |
| | | if (updateMsg == null) |
| | | { |
| | | // The server is shutting down. |
| | | listenerThread.initiateShutdown(); |
| | | } |
| | | else if (processUpdate(updateMsg)) |
| | | { |
| | | /* |
| | | * Warning: in synchronous mode, no way to tell the replay of an |
| | | * update went wrong Just put null in processUpdateDone so that if |
| | | * assured replication is used the ack is sent without error at |
| | | * replay flag. |
| | | */ |
| | | processUpdateDone(updateMsg, null); |
| | | state.update(updateMsg.getCSN()); |
| | | } |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Replication Listener thread stopping."); |
| | | } |
| | | } |
| | | }, threadName); |
| | | |
| | | listenerThread.start(); |
| | | } |
| | | } |
| | |
| | | // Stop the listener thread |
| | | if (listenerThread != null) |
| | | { |
| | | listenerThread.shutdown(); |
| | | listenerThread.waitForShutdown(); |
| | | listenerThread.initiateShutdown(); |
| | | try |
| | | { |
| | | listenerThread.join(); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // Give up waiting. |
| | | } |
| | | listenerThread = null; |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns {@code true} if the listener thread is shutting down or has |
| | | * shutdown. |
| | | * |
| | | * @return {@code true} if the listener thread is shutting down or has |
| | | * shutdown. |
| | | */ |
| | | protected final boolean isListenerShuttingDown() |
| | | { |
| | | final DirectoryThread tmp = listenerThread; |
| | | return tmp == null || tmp.isShutdownInitiated(); |
| | | } |
| | | |
| | | /** |
| | | * Restart the Replication service after a {@link #disableService()}. |
| | | * <p> |
| | | * The Replication Service will restart from the point indicated by the |
| | |
| | | synchronized (sessionLock) |
| | | { |
| | | broker.start(); |
| | | |
| | | // Create the listener thread |
| | | listenerThread = new ListenerThread(this); |
| | | listenerThread.start(); |
| | | startListenService(); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public abstract long countEntries() throws DirectoryException; |
| | | |
| | | |
| | | |
| | | /** |
| | | * This method should handle the processing of {@link UpdateMsg} receive from |
| | | * remote replication entities. |
| | |
| | | * |
| | | * @param updateMsg |
| | | * The {@link UpdateMsg} that was received. |
| | | * @param shutdown |
| | | * whether the server initiated shutdown |
| | | * @return A boolean indicating if the processing is completed at return time. |
| | | * If <code> true </code> is returned, no further processing is |
| | | * necessary. If <code> false </code> is returned, the subclass should |
| | | * call the method {@link #processUpdateDone(UpdateMsg, String)} and |
| | | * update the ServerState When this processing is complete. |
| | | */ |
| | | public abstract boolean processUpdate(UpdateMsg updateMsg, |
| | | AtomicBoolean shutdown); |
| | | public abstract boolean processUpdate(UpdateMsg updateMsg); |
| | | |
| | | /** |
| | | * This method must be called after each call to |
| | | * {@link #processUpdate(UpdateMsg, AtomicBoolean)} when the processing of the |
| | | * {@link #processUpdate(UpdateMsg)} when the processing of the |
| | | * update is completed. |
| | | * <p> |
| | | * It is useful for implementation needing to process the update in an |
| | |
| | | * this update, and this is the matching human readable message |
| | | * describing the problem. |
| | | */ |
| | | public void processUpdateDone(UpdateMsg msg, String replayErrorMsg) |
| | | protected void processUpdateDone(UpdateMsg msg, String replayErrorMsg) |
| | | { |
| | | broker.updateWindowAfterReplay(); |
| | | |
| | |
| | | * The Replication Service will handle the delivery of this {@link UpdateMsg} |
| | | * to all the participants of this Replication Domain. These members will be |
| | | * receive this {@link UpdateMsg} through a call of the |
| | | * {@link #processUpdate(UpdateMsg, AtomicBoolean)} message. |
| | | * {@link #processUpdate(UpdateMsg)} message. |
| | | * |
| | | * @param msg The UpdateMsg that should be pushed. |
| | | */ |