| | |
| | | } |
| | | |
| | | /** |
| | | * Set the generation id - for test purpose. |
| | | * @param generationID The generation id |
| | | */ |
| | | public void setGenerationID(long generationID) |
| | | { |
| | | this.generationID = generationID; |
| | | } |
| | | |
| | | /** |
| | | * Gets the server url of the RS we are connected to. |
| | | * @return The server url of the RS we are connected to |
| | | */ |
| | |
| | | { |
| | | this.locallyConfigured = locallyConfigured; |
| | | } |
| | | |
| | | /** |
| | | * Returns a string representation of this object. |
| | | * @return A string representation of this object. |
| | | */ |
| | | public String toString() |
| | | { |
| | | return "Url:"+ this.getServerURL() + " ServerId:" + this.serverId; |
| | | } |
| | | } |
| | | |
| | | private void connect() |
| | |
| | | // Best found, now initialize connection to this one (handshake phase 1) |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "phase 2 : will perform PhaseOneH with the preferred RS."); |
| | | "phase 2 : will perform PhaseOneH with the preferred RS=" |
| | | + replicationServerInfo); |
| | | replicationServerInfo = performPhaseOneHandshake( |
| | | replicationServerInfo.getServerURL(), true); |
| | | |
| | |
| | | |
| | | /** |
| | | * restart the ReplicationBroker. |
| | | * @param infiniteTry the socket which failed |
| | | */ |
| | | public void reStart() |
| | | public void reStart(boolean infiniteTry) |
| | | { |
| | | reStart(this.session); |
| | | reStart(this.session, infiniteTry); |
| | | } |
| | | |
| | | /** |
| | | * Restart the ReplicationServer broker after a failure. |
| | | * |
| | | * @param failingSession the socket which failed |
| | | * @param infiniteTry the socket which failed |
| | | */ |
| | | public void reStart(ProtocolSession failingSession) |
| | | public void reStart(ProtocolSession failingSession, boolean infiniteTry) |
| | | { |
| | | |
| | | if (failingSession != null) |
| | |
| | | rsGroupId = (byte) -1; |
| | | rsServerId = -1; |
| | | rsServerUrl = null; |
| | | session = null; |
| | | } |
| | | while (!this.connected && (!this.shutdown)) |
| | | { |
| | |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | } |
| | | if ((!connected) && (!infiniteTry)) |
| | | break; |
| | | if ((!connected) && (!shutdown)) |
| | | { |
| | | try |
| | |
| | | } |
| | | } |
| | | } |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(this + |
| | | " end restart : connected=" + connected + |
| | | " with RSid=" + this.getRsServerId() + |
| | | " genid=" + this.generationID); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void publish(ReplicationMsg msg) |
| | | { |
| | | _publish(msg, false); |
| | | _publish(msg, false, true); |
| | | } |
| | | |
| | | /** |
| | | * Publish a message to the other servers. |
| | | * @param msg The message to publish. |
| | | * @param retryOnFailure Whether reconnect should automatically be done. |
| | | * @return Whether publish succeeded. |
| | | */ |
| | | public boolean publish(ReplicationMsg msg, boolean retryOnFailure) |
| | | { |
| | | return _publish(msg, false, retryOnFailure); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void publishRecovery(ReplicationMsg msg) |
| | | { |
| | | _publish(msg, true); |
| | | _publish(msg, true, true); |
| | | } |
| | | |
| | | /** |
| | | * Publish a message to the other servers. |
| | | * @param msg the message to publish |
| | | * @param recoveryMsg the message is a recovery Message |
| | | * @param retryOnFailure whether retry should be done on failure |
| | | * @return whether the message was successfully sent. |
| | | */ |
| | | void _publish(ReplicationMsg msg, boolean recoveryMsg) |
| | | boolean _publish(ReplicationMsg msg, boolean recoveryMsg, |
| | | boolean retryOnFailure) |
| | | { |
| | | boolean done = false; |
| | | |
| | |
| | | "message is not possible due to existing connection error."); |
| | | } |
| | | |
| | | return; |
| | | return false; |
| | | } |
| | | |
| | | try |
| | |
| | | // do it. |
| | | if (!recoveryMsg & connectRequiresRecovery) |
| | | { |
| | | return; |
| | | return false; |
| | | } |
| | | |
| | | if (msg instanceof UpdateMsg) |
| | |
| | | } |
| | | } catch (IOException e) |
| | | { |
| | | if (!retryOnFailure) |
| | | return false; |
| | | |
| | | // The receive threads should handle reconnection or |
| | | // mark this broker in error. Just retry. |
| | | synchronized (connectPhaseLock) |
| | |
| | | } |
| | | } |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public ReplicationMsg receive() throws SocketTimeoutException |
| | | { |
| | | return receive(false); |
| | | return receive(false, true, false); |
| | | } |
| | | |
| | | /** |
| | |
| | | * called in a single thread or protected by a locking mechanism |
| | | * before being called. |
| | | * |
| | | * @return the received message |
| | | * @throws SocketTimeoutException if the timeout set by setSoTimeout |
| | | * has expired |
| | | * @param allowReconnectionMechanism If true, this allows the reconnection |
| | | * mechanism to disconnect the broker if it detects that it should reconnect |
| | | * to another replication server because of some criteria defined by the |
| | | * algorithm where we choose a suitable replication server. |
| | | * @param reconnectToTheBestRS Whether broker will automatically switch |
| | | * to the best suitable RS. |
| | | * @param reconnectOnFailure Whether broker will automatically reconnect |
| | | * on failure. |
| | | * @param returnOnTopoChange Whether broker should return TopologyMsg |
| | | * received. |
| | | * @return the received message |
| | | * |
| | | * @throws SocketTimeoutException if the timeout set by setSoTimeout |
| | | * has expired |
| | | */ |
| | | public ReplicationMsg receive(boolean allowReconnectionMechanism) |
| | | public ReplicationMsg receive(boolean reconnectToTheBestRS, |
| | | boolean reconnectOnFailure, boolean returnOnTopoChange) |
| | | throws SocketTimeoutException |
| | | { |
| | | while (shutdown == false) |
| | | { |
| | | if (!connected) |
| | | if ((reconnectOnFailure) && (!connected)) |
| | | { |
| | | reStart(null); |
| | | // infinite try to reconnect |
| | | reStart(null, true); |
| | | } |
| | | |
| | | ProtocolSession failingSession = session; |
| | |
| | | { |
| | | TopologyMsg topoMsg = (TopologyMsg) msg; |
| | | receiveTopo(topoMsg); |
| | | if (allowReconnectionMechanism) |
| | | if (reconnectToTheBestRS) |
| | | { |
| | | // Reset wait time before next computation of best server |
| | | mustRunBestServerCheckingAlgorithm = 0; |
| | | } |
| | | |
| | | // Caller wants to check what's changed |
| | | if (returnOnTopoChange) |
| | | return msg; |
| | | |
| | | } else if (msg instanceof StopMsg) |
| | | { |
| | | /* |
| | |
| | | Integer.toString(serverId)); |
| | | logError(message); |
| | | // Try to find a suitable RS |
| | | this.reStart(failingSession); |
| | | this.reStart(failingSession, true); |
| | | } else if (msg instanceof MonitorMsg) |
| | | { |
| | | // This is the response to a MonitorRequest that was sent earlier or |
| | |
| | | // it is still the one we are currently connected to. If not, |
| | | // disconnect properly and let the connection algorithm re-connect to |
| | | // best replication server |
| | | if (allowReconnectionMechanism) |
| | | if (reconnectToTheBestRS) |
| | | { |
| | | mustRunBestServerCheckingAlgorithm++; |
| | | if (mustRunBestServerCheckingAlgorithm == 2) |
| | |
| | | NOTE_NEW_BEST_REPLICATION_SERVER.get(baseDn.toString(), |
| | | Integer.toString(serverId), |
| | | Integer.toString(rsServerId), |
| | | rsServerUrl); |
| | | rsServerUrl, |
| | | Integer.toString(bestServerInfo.getServerId())); |
| | | logError(message); |
| | | reStart(); |
| | | reStart(null, true); |
| | | } |
| | | |
| | | // Reset wait time before next computation of best server |
| | |
| | | Integer.toString(serverId)); |
| | | logError(message); |
| | | } |
| | | this.reStart(failingSession); |
| | | if (reconnectOnFailure) |
| | | reStart(failingSession, true); |
| | | else |
| | | break; // does not seem necessary to explicitely disconnect .. |
| | | } |
| | | } |
| | | } |
| | | } // while !shutdown |
| | | return null; |
| | | } |
| | | |
| | |
| | | public void stop() |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("ReplicationBroker " + serverId + " is stopping and will" + |
| | | " close the connection to replication server " + rsServerId + " for" + |
| | | " domain " + baseDn); |
| | | } |
| | | |
| | | stopRSHeartBeatMonitoring(); |
| | | stopChangeTimeHeartBeatPublishing(); |
| | | replicationServer = "stopped"; |
| | |
| | | rsServerId = -1; |
| | | rsServerUrl = null; |
| | | |
| | | if (session != null) |
| | | try |
| | | { |
| | | if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // V4 protocol introduces a StopMsg to properly end communications |
| | | try |
| | | { |
| | | session.publish(new StopMsg()); |
| | | } catch (IOException ioe) |
| | | { |
| | | // Anyway, going to close session, so nothing to do |
| | | } |
| | | } |
| | | try |
| | | { |
| | | session.close(); |
| | | } catch (IOException e) |
| | | { |
| | | } |
| | | session.close(); |
| | | } catch (Exception e) |
| | | { |
| | | // Anyway, going to close session, so nothing to do |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public void receiveTopo(TopologyMsg topoMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(this + " receive TopologyMsg=" + topoMsg); |
| | | |
| | | // Store new DS list |
| | | dsList = topoMsg.getDsList(); |
| | | |
| | |
| | | { |
| | | connectRequiresRecovery = b; |
| | | } |
| | | |
| | | /** |
| | | * Returns whether the broker is shutting down. |
| | | * @return whether the broker is shutting down. |
| | | */ |
| | | public boolean shuttingDown() |
| | | { |
| | | return shutdown; |
| | | } |
| | | |
| | | } |