| | |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | private volatile boolean shutdown = false; |
| | | private final Object startStopLock = new Object(); |
| | | private volatile Collection<String> servers; |
| | | private volatile boolean connected = false; |
| | | private volatile String replicationServer = "Not connected"; |
| | |
| | | */ |
| | | public void start() |
| | | { |
| | | synchronized (startStopLock) |
| | | { |
| | | shutdown = false; |
| | | this.rcvWindow = this.maxRcvWindow; |
| | | this.connect(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Start the ReplicationBroker. |
| | |
| | | */ |
| | | public void start(Collection<String> servers) |
| | | { |
| | | synchronized (startStopLock) |
| | | { |
| | | /* |
| | | * Open Socket to the ReplicationServer |
| | | * Send the Start message |
| | | * Open Socket to the ReplicationServer Send the Start message |
| | | */ |
| | | shutdown = false; |
| | | this.servers = servers; |
| | |
| | | this.rcvWindow = this.maxRcvWindow; |
| | | this.connect(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Gets the group id of the RS we are connected to. |
| | |
| | | */ |
| | | public void reStart(boolean infiniteTry) |
| | | { |
| | | reStart(this.session, infiniteTry); |
| | | reStart(session, infiniteTry); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void reStart(ProtocolSession failingSession, boolean infiniteTry) |
| | | { |
| | | |
| | | if (failingSession != null) |
| | | { |
| | | failingSession.close(); |
| | |
| | | |
| | | if (failingSession == session) |
| | | { |
| | | this.connected = false; |
| | | connected = false; |
| | | rsGroupId = (byte) -1; |
| | | rsServerId = -1; |
| | | rsServerUrl = null; |
| | | session = null; |
| | | } |
| | | while (!this.connected && (!this.shutdown)) |
| | | |
| | | while (true) |
| | | { |
| | | // Synchronize inside the loop in order to allow shutdown. |
| | | boolean needSleep = false; |
| | | |
| | | synchronized (startStopLock) |
| | | { |
| | | if (connected || shutdown) |
| | | { |
| | | break; |
| | | } |
| | | |
| | | try |
| | | { |
| | | this.connect(); |
| | | } catch (Exception e) |
| | | connect(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get( |
| | | baseDn, e.getLocalizedMessage())); |
| | | mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(baseDn, |
| | | e.getLocalizedMessage())); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | } |
| | | if ((!connected) && (!infiniteTry)) |
| | | |
| | | if (connected || !infiniteTry) |
| | | { |
| | | break; |
| | | if ((!connected) && (!shutdown)) |
| | | } |
| | | |
| | | needSleep = true; |
| | | } |
| | | |
| | | if (needSleep) |
| | | { |
| | | try |
| | | { |
| | | Thread.sleep(500); |
| | | } catch (InterruptedException e) |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // ignore |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | debugInfo(this + |
| | | " end restart : connected=" + connected + |
| | | " with RSid=" + this.getRsServerId() + |
| | | " genid=" + this.generationID); |
| | | { |
| | | debugInfo(this + " end restart : connected=" + connected + " with RSid=" |
| | | + this.getRsServerId() + " genid=" + this.generationID); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | boolean reconnectOnFailure, boolean returnOnTopoChange) |
| | | throws SocketTimeoutException |
| | | { |
| | | while (shutdown == false) |
| | | while (!shutdown) |
| | | { |
| | | if ((reconnectOnFailure) && (!connected)) |
| | | if (reconnectOnFailure && !connected) |
| | | { |
| | | // infinite try to reconnect |
| | | reStart(null, true); |
| | |
| | | |
| | | // Save session information for later in case we need it for log messages |
| | | // after the session has been closed and/or failed. |
| | | final ProtocolSession failingSession = session; |
| | | final int replicationServerID = rsServerId; |
| | | final ProtocolSession savedSession = session; |
| | | if (savedSession == null) |
| | | { |
| | | // Must be shutting down. |
| | | break; |
| | | } |
| | | |
| | | final int replicationServerID = rsServerId; |
| | | try |
| | | { |
| | | ReplicationMsg msg = session.receive(); |
| | | ReplicationMsg msg = savedSession.receive(); |
| | | if (msg instanceof UpdateMsg) |
| | | { |
| | | synchronized (this) |
| | |
| | | { |
| | | WindowMsg windowMsg = (WindowMsg) msg; |
| | | sendWindow.release(windowMsg.getNumAck()); |
| | | } else if (msg instanceof TopologyMsg) |
| | | } |
| | | else if (msg instanceof TopologyMsg) |
| | | { |
| | | TopologyMsg topoMsg = (TopologyMsg) msg; |
| | | receiveTopo(topoMsg); |
| | |
| | | if (returnOnTopoChange) |
| | | return msg; |
| | | |
| | | } else if (msg instanceof StopMsg) |
| | | } |
| | | else if (msg instanceof StopMsg) |
| | | { |
| | | /* |
| | | * RS performs a proper disconnection |
| | | */ |
| | | Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED |
| | | .get(replicationServerID, |
| | | failingSession.getReadableRemoteAddress(), |
| | | savedSession.getReadableRemoteAddress(), |
| | | serverId, baseDn); |
| | | logError(message); |
| | | |
| | | // Try to find a suitable RS |
| | | this.reStart(failingSession, true); |
| | | } else if (msg instanceof MonitorMsg) |
| | | this.reStart(savedSession, true); |
| | | } |
| | | else if (msg instanceof MonitorMsg) |
| | | { |
| | | // This is the response to a MonitorRequest that was sent earlier or |
| | | // the regular message of the monitoring publisher of the RS. |
| | |
| | | { |
| | | message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get( |
| | | serverId, replicationServerID, |
| | | failingSession.getReadableRemoteAddress(), |
| | | savedSession.getReadableRemoteAddress(), |
| | | baseDn); |
| | | } |
| | | else |
| | | { |
| | | message = NOTE_NEW_BEST_REPLICATION_SERVER.get( |
| | | serverId, replicationServerID, |
| | | failingSession.getReadableRemoteAddress(), |
| | | savedSession.getReadableRemoteAddress(), |
| | | bestServerInfo.getServerId(), baseDn); |
| | | } |
| | | logError(message); |
| | |
| | | mustRunBestServerCheckingAlgorithm = 0; |
| | | } |
| | | } |
| | | } else |
| | | } |
| | | else |
| | | { |
| | | return msg; |
| | | } |
| | | } catch (SocketTimeoutException e) |
| | | } |
| | | catch (SocketTimeoutException e) |
| | | { |
| | | throw e; |
| | | } catch (Exception e) |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | |
| | | if (shutdown == false) |
| | | if (!shutdown) |
| | | { |
| | | if ((session == null) || (!session.closeInitiated())) |
| | | final ProtocolSession tmpSession = session; |
| | | if (tmpSession == null || !tmpSession.closeInitiated()) |
| | | { |
| | | /* |
| | | * We did not initiate the close on our side, log an error message. |
| | | */ |
| | | Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED |
| | | .get(serverId, baseDn, replicationServerID, |
| | | failingSession.getReadableRemoteAddress()); |
| | | savedSession.getReadableRemoteAddress()); |
| | | logError(message); |
| | | } |
| | | |
| | | if (reconnectOnFailure) |
| | | reStart(failingSession, true); |
| | | { |
| | | reStart(savedSession, true); |
| | | } |
| | | else |
| | | break; // does not seem necessary to explicitely disconnect .. |
| | | { |
| | | break; // does not seem necessary to explicitly disconnect .. |
| | | } |
| | | } |
| | | } |
| | | } // while !shutdown |
| | |
| | | " close the connection to replication server " + rsServerId + " for" + |
| | | " domain " + baseDn); |
| | | |
| | | synchronized (startStopLock) |
| | | { |
| | | shutdown = true; |
| | | connected = false; |
| | | stopRSHeartBeatMonitoring(); |
| | | stopChangeTimeHeartBeatPublishing(); |
| | | replicationServer = "stopped"; |
| | | shutdown = true; |
| | | connected = false; |
| | | rsGroupId = (byte) -1; |
| | | rsServerId = -1; |
| | | rsServerUrl = null; |
| | |
| | | session.close(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Set a timeout value. |