| | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.server.util.StaticUtils.collectionToString; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import java.io.IOException; |
| | |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.protocol.ChangeStatusMsg; |
| | | import org.opends.server.replication.protocol.HeartbeatMonitor; |
| | | import org.opends.server.replication.protocol.MonitorMsg; |
| | | import org.opends.server.replication.protocol.MonitorRequestMsg; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | |
| | | |
| | | // Get info from every available replication servers |
| | | replicationServerInfos = collectReplicationServersInfo(); |
| | | String rsis = replicationServerInfos.toString(); |
| | | |
| | | ReplicationServerInfo replicationServerInfo = null; |
| | | |
| | |
| | | this.getGenerationID()) || |
| | | (replicationServerInfo.getGenerationId() == -1)) |
| | | { |
| | | Message message = |
| | | NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get( |
| | | baseDn.toString(), |
| | | Integer.toString(rsServerId), |
| | | replicationServer, |
| | | Integer.toString(serverId), |
| | | Long.toString(this.getGenerationID())); |
| | | Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG |
| | | .get(serverId, rsServerId, baseDn, |
| | | session.getReadableRemoteAddress(), |
| | | getGenerationID()); |
| | | logError(message); |
| | | } else |
| | | { |
| | | Message message = |
| | | NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get( |
| | | baseDn.toString(), |
| | | replicationServer, |
| | | Long.toString(this.getGenerationID()), |
| | | Long.toString(replicationServerInfo.getGenerationId())); |
| | | Message message = WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG |
| | | .get(serverId, rsServerId, baseDn, |
| | | session.getReadableRemoteAddress(), |
| | | getGenerationID(), |
| | | replicationServerInfo.getGenerationId()); |
| | | logError(message); |
| | | } |
| | | } else |
| | |
| | | { |
| | | connectionError = true; |
| | | connectPhaseLock.notify(); |
| | | Message message = |
| | | NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString(), rsis); |
| | | logError(message); |
| | | |
| | | if (replicationServerInfos.size() > 0) |
| | | { |
| | | Message message = WARN_COULD_NOT_FIND_CHANGELOG.get( |
| | | serverId, |
| | | baseDn, |
| | | collectionToString(replicationServerInfos.keySet(), |
| | | ", ")); |
| | | logError(message); |
| | | } |
| | | else |
| | | { |
| | | Message message = WARN_NO_AVAILABLE_CHANGELOGS.get( |
| | | serverId, baseDn); |
| | | logError(message); |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Connect to the provided server performing the first phase handshake |
| | | * (start messages exchange) and return the reply message from the replication |
| | | * Connect to the provided server performing the first phase handshake (start |
| | | * messages exchange) and return the reply message from the replication |
| | | * server, wrapped in a ReplicationServerInfo object. |
| | | * |
| | | * @param server Server to connect to. |
| | | * @param keepConnection Do we keep session opened or not after handshake. |
| | | * Use true if want to perform handshake phase 2 with the same session |
| | | * and keep the session to create as the current one. |
| | | * @return The answer from the server . Null if could not |
| | | * get an answer. |
| | | * @param server |
| | | * Server to connect to. |
| | | * @param keepConnection |
| | | * Do we keep session opened or not after handshake. Use true if want |
| | | * to perform handshake phase 2 with the same session and keep the |
| | | * session to create as the current one. |
| | | * @return The answer from the server . Null if could not get an answer. |
| | | */ |
| | | private ReplicationServerInfo performPhaseOneHandshake(String server, |
| | | boolean keepConnection) |
| | | private ReplicationServerInfo performPhaseOneHandshake( |
| | | String server, boolean keepConnection) |
| | | { |
| | | ReplicationServerInfo replServerInfo = null; |
| | | |
| | | // Parse server string. |
| | | int separator = server.lastIndexOf(':'); |
| | | String port = server.substring(separator + 1); |
| | | String hostname = server.substring(0, separator); |
| | | ProtocolSession localSession = null; |
| | | |
| | | boolean error = false; |
| | | ProtocolSession localSession = null; |
| | | Socket socket = null; |
| | | boolean hasConnected = false; |
| | | Message errorMessage = null; |
| | | |
| | | try |
| | | { |
| | | /* |
| | |
| | | */ |
| | | int intPort = Integer.parseInt(port); |
| | | InetSocketAddress serverAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), intPort); |
| | | Socket socket = new Socket(); |
| | | InetAddress.getByName(hostname), intPort); |
| | | socket = new Socket(); |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.setTcpNoDelay(true); |
| | | socket.connect(serverAddr, 500); |
| | | localSession = replSessionSecurity.createClientSession(server, socket, |
| | | ReplSessionSecurity.HANDSHAKE_TIMEOUT); |
| | | boolean isSslEncryption = |
| | | replSessionSecurity.isSslEncryption(server); |
| | | /* |
| | | * Send our ServerStartMsg. |
| | | */ |
| | | ServerStartMsg serverStartMsg = new ServerStartMsg(serverId, baseDn, |
| | | maxRcvWindow, heartbeatInterval, state, |
| | | ProtocolVersion.getCurrentVersion(), this.getGenerationID(), |
| | | isSslEncryption, |
| | | groupId); |
| | | localSession = replSessionSecurity.createClientSession( |
| | | socket, ReplSessionSecurity.HANDSHAKE_TIMEOUT); |
| | | boolean isSslEncryption = replSessionSecurity |
| | | .isSslEncryption(server); |
| | | |
| | | // Send our ServerStartMsg. |
| | | ServerStartMsg serverStartMsg = new ServerStartMsg(serverId, |
| | | baseDn, maxRcvWindow, heartbeatInterval, state, |
| | | ProtocolVersion.getCurrentVersion(), |
| | | this.getGenerationID(), isSslEncryption, groupId); |
| | | localSession.publish(serverStartMsg); |
| | | |
| | | /* |
| | | * Read the ReplServerStartMsg or ReplServerStartDSMsg that should come |
| | | * back. |
| | | */ |
| | | // Read the ReplServerStartMsg or ReplServerStartDSMsg that should |
| | | // come back. |
| | | ReplicationMsg msg = localSession.receive(); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("In RB for " + baseDn + |
| | | "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() + |
| | | "\nAND RECEIVED:\n" + msg.toString()); |
| | | debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n" |
| | | + serverStartMsg.toString() + "\nAND RECEIVED:\n" |
| | | + msg.toString()); |
| | | } |
| | | |
| | | // Wrap received message in a server info object |
| | | replServerInfo = ReplicationServerInfo.newInstance(msg); |
| | | ReplicationServerInfo replServerInfo = ReplicationServerInfo |
| | | .newInstance(msg); |
| | | |
| | | // Sanity check |
| | | String repDn = replServerInfo.getBaseDn(); |
| | | if (!(this.baseDn.equals(repDn))) |
| | | { |
| | | Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(), |
| | | this.baseDn); |
| | | logError(message); |
| | | error = true; |
| | | errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(), |
| | | this.baseDn); |
| | | return null; |
| | | } |
| | | |
| | | /* |
| | | * We have sent our own protocol version to the replication server. |
| | | * The replication server will use the same one (or an older one |
| | | * if it is an old replication server). |
| | | * We have sent our own protocol version to the replication server. The |
| | | * replication server will use the same one (or an older one if it is an |
| | | * old replication server). |
| | | */ |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | replServerInfo.getProtocolVersion()); |
| | | protocolVersion = ProtocolVersion.minWithCurrent(replServerInfo |
| | | .getProtocolVersion()); |
| | | localSession.setProtocolVersion(protocolVersion); |
| | | |
| | | |
| | | if (!isSslEncryption) |
| | | { |
| | | localSession.stopEncryption(); |
| | | } |
| | | } catch (ConnectException e) |
| | | { |
| | | /* |
| | | * There was no server waiting on this host:port |
| | | * Log a notice and try the next replicationServer in the list |
| | | */ |
| | | if (!connectionError) |
| | | { |
| | | Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server); |
| | | if (keepConnection) // Log error message only for final connection |
| | | { |
| | | // the error message is only logged once to avoid overflowing |
| | | // the error log |
| | | logError(message); |
| | | } else if (debugEnabled()) |
| | | { |
| | | debugInfo(message.toString()); |
| | | } |
| | | } |
| | | error = true; |
| | | } catch (Exception e) |
| | | { |
| | | if ((e instanceof SocketTimeoutException) && debugEnabled()) |
| | | { |
| | | debugInfo("Timeout trying to connect to RS " + server + |
| | | " for dn: " + baseDn); |
| | | } |
| | | Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1", |
| | | baseDn, server, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | if (keepConnection) // Log error message only for final connection |
| | | { |
| | | logError(message); |
| | | } else if (debugEnabled()) |
| | | { |
| | | debugInfo(message.toString()); |
| | | } |
| | | error = true; |
| | | } |
| | | |
| | | // Close session if requested |
| | | if (!keepConnection || error) |
| | | { |
| | | if (localSession != null) |
| | | hasConnected = true; |
| | | |
| | | // If this connection as the one to use for sending and receiving |
| | | // updates, store it. |
| | | if (keepConnection) |
| | | { |
| | | if (debugEnabled()) { |
| | | debugInfo("In RB, closing session after phase 1"); |
| | | session = localSession; |
| | | } |
| | | |
| | | return replServerInfo; |
| | | } |
| | | catch (ConnectException e) |
| | | { |
| | | errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(serverId, |
| | | server, baseDn); |
| | | return null; |
| | | } |
| | | catch (SocketTimeoutException e) |
| | | { |
| | | errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(serverId, |
| | | server, baseDn); |
| | | return null; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId, |
| | | server, baseDn, stackTraceToSingleLineString(e)); |
| | | return null; |
| | | } |
| | | finally |
| | | { |
| | | if (!hasConnected || !keepConnection) |
| | | { |
| | | if (localSession != null) |
| | | { |
| | | localSession.close(); |
| | | } |
| | | |
| | | localSession.close(); |
| | | localSession = null; |
| | | if (socket != null) |
| | | { |
| | | try |
| | | { |
| | | socket.close(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | // Ignore. |
| | | } |
| | | } |
| | | } |
| | | if (error) |
| | | |
| | | if (!hasConnected && errorMessage != null) |
| | | { |
| | | replServerInfo = null; |
| | | } // Be sure to return null. |
| | | // There was no server waiting on this host:port Log a notice and try |
| | | // the next replicationServer in the list |
| | | if (!connectionError) |
| | | { |
| | | if (keepConnection) // Log error message only for final connection |
| | | { |
| | | // the error message is only logged once to avoid overflowing |
| | | // the error log |
| | | logError(errorMessage); |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo(errorMessage.toString()); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | // If this connection as the one to use for sending and receiving updates, |
| | | // store it. |
| | | if (keepConnection) |
| | | { |
| | | session = localSession; |
| | | } |
| | | |
| | | return replServerInfo; |
| | | } |
| | | |
| | | /** |
| | |
| | | private ReplServerStartDSMsg performECLPhaseOneHandshake(String server, |
| | | boolean keepConnection) |
| | | { |
| | | // FIXME: this should be merged with performPhaseOneHandshake to avoid |
| | | // code/bug duplication. |
| | | ReplServerStartDSMsg replServerStartDSMsg = null; |
| | | |
| | | // Parse server string. |
| | |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.setTcpNoDelay(true); |
| | | socket.connect(serverAddr, 500); |
| | | localSession = replSessionSecurity.createClientSession(server, socket, |
| | | localSession = replSessionSecurity.createClientSession(socket, |
| | | ReplSessionSecurity.HANDSHAKE_TIMEOUT); |
| | | boolean isSslEncryption = |
| | | replSessionSecurity.isSslEncryption(server); |
| | |
| | | */ |
| | | if (!connectionError) |
| | | { |
| | | Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server); |
| | | Message message = WARN_NO_CHANGELOG_SERVER_LISTENING.get(serverId, |
| | | server, baseDn); |
| | | |
| | | if (keepConnection) // Log error message only for final connection |
| | | { |
| | | // the error message is only logged once to avoid overflowing |
| | |
| | | debugInfo("Timeout trying to connect to RS " + server + |
| | | " for dn: " + baseDn); |
| | | } |
| | | Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1", |
| | | baseDn, server, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId, |
| | | server, baseDn, stackTraceToSingleLineString(e)); |
| | | if (keepConnection) // Log error message only for final connection |
| | | { |
| | | logError(message); |
| | |
| | | |
| | | } catch (Exception e) |
| | | { |
| | | Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2", |
| | | baseDn, server, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId, |
| | | server, baseDn, stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | |
| | | if (session != null) |
| | |
| | | |
| | | } catch (Exception e) |
| | | { |
| | | Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2", |
| | | baseDn, server, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId, |
| | | server, baseDn, stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | |
| | | if (session != null) |
| | |
| | | // Start a heartbeat monitor thread. |
| | | if (heartbeatInterval > 0) |
| | | { |
| | | String threadName = "Replica DS(" |
| | | + this.getServerId() + ") heartbeat monitor for domain \"" |
| | | + this.baseDn + "\" from RS(" + this.getRsServerId() |
| | | + ") at " + session.getReadableRemoteAddress(); |
| | | |
| | | heartbeatMonitor = new HeartbeatMonitor( |
| | | threadName, |
| | | session, |
| | | heartbeatInterval); |
| | | heartbeatMonitor = new HeartbeatMonitor(getServerId(), |
| | | getRsServerId(), baseDn, session, heartbeatInterval); |
| | | heartbeatMonitor.start(); |
| | | } |
| | | } |
| | |
| | | reStart(null, true); |
| | | } |
| | | |
| | | ProtocolSession failingSession = session; |
| | | // Save session information for later in case we need it for log messages |
| | | // after the session has been closed and/or failed. |
| | | final ProtocolSession failingSession = session; |
| | | final int replicationServerID = rsServerId; |
| | | |
| | | try |
| | | { |
| | | ReplicationMsg msg = session.receive(); |
| | |
| | | /* |
| | | * RS performs a proper disconnection |
| | | */ |
| | | Message message = |
| | | NOTE_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(replicationServer, |
| | | Integer.toString(rsServerId), baseDn.toString(), |
| | | Integer.toString(serverId)); |
| | | Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED |
| | | .get(replicationServerID, |
| | | failingSession.getReadableRemoteAddress(), |
| | | serverId, baseDn); |
| | | logError(message); |
| | | |
| | | // Try to find a suitable RS |
| | | this.reStart(failingSession, true); |
| | | } else if (msg instanceof MonitorMsg) |
| | |
| | | { |
| | | bestServerId = bestServerInfo.getServerId(); |
| | | } |
| | | Message message = |
| | | NOTE_NEW_BEST_REPLICATION_SERVER.get(baseDn.toString(), |
| | | Integer.toString(serverId), |
| | | Integer.toString(rsServerId), |
| | | rsServerUrl, |
| | | Integer.toString(bestServerId)); |
| | | Message message = NOTE_NEW_BEST_REPLICATION_SERVER |
| | | .get(serverId, replicationServerID, |
| | | failingSession.getReadableRemoteAddress(), |
| | | bestServerId, baseDn); |
| | | logError(message); |
| | | reStart(true); |
| | | } |
| | |
| | | /* |
| | | * We did not initiate the close on our side, log an error message. |
| | | */ |
| | | Message message = |
| | | ERR_REPLICATION_SERVER_BADLY_DISCONNECTED.get(replicationServer, |
| | | Integer.toString(rsServerId), baseDn.toString(), |
| | | Integer.toString(serverId)); |
| | | Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED |
| | | .get(serverId, baseDn, replicationServerID, |
| | | failingSession.getReadableRemoteAddress()); |
| | | logError(message); |
| | | } |
| | | if (reconnectOnFailure) |