| | |
| | | */ |
| | | public final static String NO_CONNECTED_SERVER = "Not connected"; |
| | | private volatile String replicationServer = NO_CONNECTED_SERVER; |
| | | private volatile Session session = null; |
| | | private volatile Session session; |
| | | private final ServerState state; |
| | | private final DN baseDN; |
| | | private final int serverId; |
| | |
| | | // Send our Start Session |
| | | StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg(); |
| | | startECLSessionMsg.setOperationId("-1"); |
| | | session.publish(startECLSessionMsg); |
| | | final Session localSession = session; |
| | | localSession.publish(startECLSessionMsg); |
| | | |
| | | // FIXME ECL In the handshake phase two, should RS send back a topo msg ? |
| | | if (debugEnabled()) |
| | |
| | | } |
| | | |
| | | // Alright set the timeout to the desired value |
| | | session.setSoTimeout(timeout); |
| | | localSession.setSoTimeout(timeout); |
| | | connected = true; |
| | | } catch (Exception e) |
| | | { |
| | |
| | | private TopologyMsg performPhaseTwoHandshake(String server, |
| | | ServerStatus initStatus) |
| | | { |
| | | TopologyMsg topologyMsg; |
| | | |
| | | try |
| | | { |
| | | /* |
| | |
| | | startSessionMsg = |
| | | new StartSessionMsg(initStatus, new ArrayList<String>()); |
| | | } |
| | | session.publish(startSessionMsg); |
| | | final Session localSession = session; |
| | | localSession.publish(startSessionMsg); |
| | | |
| | | /* |
| | | * Read the TopologyMsg that should come back. |
| | | */ |
| | | topologyMsg = (TopologyMsg) session.receive(); |
| | | TopologyMsg topologyMsg = (TopologyMsg) localSession.receive(); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | |
| | | } |
| | | |
| | | // Alright set the timeout to the desired value |
| | | session.setSoTimeout(timeout); |
| | | |
| | | localSession.setSoTimeout(timeout); |
| | | return topologyMsg; |
| | | } catch (Exception e) |
| | | { |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId, |
| | |
| | | setSession(null); |
| | | |
| | | // Be sure to return null. |
| | | topologyMsg = null; |
| | | return null; |
| | | } |
| | | return topologyMsg; |
| | | } |
| | | |
| | | /** |
| | |
| | | * local DS |
| | | * - replication server in the same VM as local DS one |
| | | */ |
| | | // TODO JNR log why an RS was evicted as best server |
| | | Map<Integer, ReplicationServerInfo> bestServers = rsInfos; |
| | | /* |
| | | The list of best replication servers is filtered with each criteria. At |
| | |
| | | Check the session. If it has changed, some disconnection or |
| | | reconnection happened and we need to restart from scratch. |
| | | */ |
| | | |
| | | if (session != null && session == currentSession) |
| | | final Session localSession = session; |
| | | if (localSession != null && session == currentSession) |
| | | { |
| | | session.publish(msg); |
| | | localSession.publish(msg); |
| | | done = true; |
| | | } |
| | | } |
| | |
| | | window update message was lost somehow... |
| | | then loop to check again if connection was closed. |
| | | */ |
| | | if (session != null) { |
| | | session.publish(new WindowProbeMsg()); |
| | | Session localSession = session; |
| | | if (localSession != null) |
| | | { |
| | | localSession.publish(new WindowProbeMsg()); |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | // Save session information for later in case we need it for log messages |
| | | // after the session has been closed and/or failed. |
| | | final Session savedSession = session; |
| | | if (savedSession == null) |
| | | final Session localSession = session; |
| | | if (localSession == null) |
| | | { |
| | | // Must be shutting down. |
| | | break; |
| | |
| | | final int previousRsServerID = rsServerId; |
| | | try |
| | | { |
| | | ReplicationMsg msg = savedSession.receive(); |
| | | ReplicationMsg msg = localSession.receive(); |
| | | if (msg instanceof UpdateMsg) |
| | | { |
| | | synchronized (this) |
| | |
| | | { |
| | | // RS performs a proper disconnection |
| | | Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get( |
| | | previousRsServerID, savedSession.getReadableRemoteAddress(), |
| | | previousRsServerID, localSession.getReadableRemoteAddress(), |
| | | serverId, baseDN.toNormalizedString()); |
| | | logError(message); |
| | | |
| | | // Try to find a suitable RS |
| | | reStart(savedSession, true); |
| | | reStart(localSession, true); |
| | | } |
| | | else if (msg instanceof MonitorMsg) |
| | | { |
| | |
| | | { |
| | | message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get( |
| | | serverId, previousRsServerID, |
| | | savedSession.getReadableRemoteAddress(), |
| | | localSession.getReadableRemoteAddress(), |
| | | baseDN.toNormalizedString()); |
| | | } |
| | | else |
| | | { |
| | | // TODO JNR log why an RS was evicted as best server |
| | | message = NOTE_NEW_BEST_REPLICATION_SERVER.get( |
| | | serverId, previousRsServerID, |
| | | savedSession.getReadableRemoteAddress(), |
| | | localSession.getReadableRemoteAddress(), |
| | | bestServerInfo.getServerId(), |
| | | baseDN.toNormalizedString()); |
| | | } |
| | |
| | | // We did not initiate the close on our side, log an error message. |
| | | Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get( |
| | | serverId, baseDN.toNormalizedString(), previousRsServerID, |
| | | savedSession.getReadableRemoteAddress()); |
| | | localSession.getReadableRemoteAddress()); |
| | | logError(message); |
| | | } |
| | | |
| | | if (reconnectOnFailure) |
| | | { |
| | | reStart(savedSession, true); |
| | | reStart(localSession, true); |
| | | } |
| | | else |
| | | { |
| | |
| | | try |
| | | { |
| | | updateDoneCount++; |
| | | if ((updateDoneCount >= halfRcvWindow) && (session != null)) |
| | | final Session localSession = session; |
| | | if (updateDoneCount >= halfRcvWindow && localSession != null) |
| | | { |
| | | session.publish(new WindowMsg(updateDoneCount)); |
| | | localSession.publish(new WindowMsg(updateDoneCount)); |
| | | rcvWindow += updateDoneCount; |
| | | updateDoneCount = 0; |
| | | } |
| | |
| | | public void setSoTimeout(int timeout) throws SocketException |
| | | { |
| | | this.timeout = timeout; |
| | | if (session != null) |
| | | final Session localSession = session; |
| | | if (localSession != null) |
| | | { |
| | | session.setSoTimeout(timeout); |
| | | localSession.setSoTimeout(timeout); |
| | | } |
| | | } |
| | | |
| | |
| | | // Start a CSN heartbeat thread. |
| | | if (changeTimeHeartbeatSendInterval > 0) |
| | | { |
| | | String threadName = "Replica DS(" + getServerId() |
| | | final Session localSession = session; |
| | | final String threadName = "Replica DS(" + getServerId() |
| | | + ") change time heartbeat publisher for domain \"" |
| | | + this.baseDN + "\" to RS(" + getRsServerId() |
| | | + ") at " + session.getReadableRemoteAddress(); |
| | | + baseDN + "\" to RS(" + getRsServerId() |
| | | + ") at " + localSession.getReadableRemoteAddress(); |
| | | |
| | | ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread( |
| | | threadName, session, changeTimeHeartbeatSendInterval, |
| | | serverId); |
| | | threadName, localSession, changeTimeHeartbeatSendInterval, serverId); |
| | | ctHeartbeatPublisherThread.start(); |
| | | } else |
| | | { |
| | |
| | | DirectoryServer.deregisterMonitorProvider(monitor); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + " " + baseDN + " " + serverId; |
| | | } |
| | | } |