| | |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.ServerConstants; |
| | | |
| | |
| | | private volatile String replicationServer = NO_CONNECTED_SERVER; |
| | | private volatile Session session = null; |
| | | private final ServerState state; |
| | | private final String baseDn; |
| | | private final DN baseDN; |
| | | private final int serverId; |
| | | private Semaphore sendWindow; |
| | | private int maxSendWindow; |
| | |
| | | * @param replicationDomain The replication domain that is creating us. |
| | | * @param state The ServerState that should be used by this broker |
| | | * when negotiating the session with the replicationServer. |
| | | * @param baseDn The base DN that should be used by this broker |
| | | * @param baseDN The base DN that should be used by this broker |
| | | * when negotiating the session with the replicationServer. |
| | | * @param serverID2 The server ID that should be used by this broker |
| | | * @param serverId The server ID that should be used by this broker |
| | | * when negotiating the session with the replicationServer. |
| | | * @param window The size of the send and receive window to use. |
| | | * @param generationId The generationId for the server associated to the |
| | |
| | | * or zero if no CSN heartbeat should be sent. |
| | | */ |
| | | public ReplicationBroker(ReplicationDomain replicationDomain, |
| | | ServerState state, String baseDn, int serverID2, int window, |
| | | ServerState state, DN baseDN, int serverId, int window, |
| | | long generationId, long heartbeatInterval, |
| | | ReplSessionSecurity replSessionSecurity, byte groupId, |
| | | long changeTimeHeartbeatInterval) |
| | | { |
| | | this.domain = replicationDomain; |
| | | this.baseDn = baseDn; |
| | | this.serverId = serverID2; |
| | | this.baseDN = baseDN; |
| | | this.serverId = serverId; |
| | | this.state = state; |
| | | this.protocolVersion = ProtocolVersion.getCurrentVersion(); |
| | | this.replSessionSecurity = replSessionSecurity; |
| | |
| | | { |
| | | shutdown = false; |
| | | this.rcvWindow = this.maxRcvWindow; |
| | | this.connect(); |
| | | connect(); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | this.rcvWindow = this.maxRcvWindow; |
| | | this.connect(); |
| | | connect(); |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | private void connect() |
| | | { |
| | | if (this.baseDn.compareToIgnoreCase( |
| | | ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT) == 0) |
| | | if (this.baseDN.toNormalizedString().equalsIgnoreCase( |
| | | ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)) |
| | | { |
| | | connectAsECL(); |
| | | } else |
| | |
| | | || (electedRsInfo.getGenerationId() == -1)) |
| | | { |
| | | Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG |
| | | .get(serverId, rsServerId, baseDn, |
| | | .get(serverId, rsServerId, baseDN.toNormalizedString(), |
| | | session.getReadableRemoteAddress(), |
| | | getGenerationID()); |
| | | logError(message); |
| | | } else |
| | | { |
| | | Message message = WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG |
| | | .get(serverId, rsServerId, baseDn, |
| | | .get(serverId, rsServerId, baseDN.toNormalizedString(), |
| | | session.getReadableRemoteAddress(), |
| | | getGenerationID(), |
| | | electedRsInfo.getGenerationId()); |
| | |
| | | { |
| | | Message message = WARN_COULD_NOT_FIND_CHANGELOG.get( |
| | | serverId, |
| | | baseDn, |
| | | collectionToString(replicationServerInfos.keySet(), |
| | | ", ")); |
| | | baseDN.toNormalizedString(), |
| | | collectionToString(replicationServerInfos.keySet(), ", ")); |
| | | logError(message); |
| | | } |
| | | else |
| | | { |
| | | Message message = WARN_NO_AVAILABLE_CHANGELOGS.get( |
| | | serverId, baseDn); |
| | | serverId, baseDN.toNormalizedString()); |
| | | logError(message); |
| | | } |
| | | } |
| | |
| | | warn user and start heartbeat monitor to recover when a server |
| | | with the right group id shows up. |
| | | */ |
| | | Message message = |
| | | WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(Byte |
| | | .toString(groupId), Integer.toString(rsServerId), rsInfo |
| | | .getServerURL(), Byte.toString(getRsGroupId()), baseDn, Integer |
| | | .toString(serverId)); |
| | | Message message = WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get( |
| | | Byte.toString(groupId), Integer.toString(rsServerId), |
| | | rsInfo.getServerURL(), Byte.toString(getRsGroupId()), |
| | | baseDN.toNormalizedString(), Integer.toString(serverId)); |
| | | logError(message); |
| | | } |
| | | startRSHeartBeatMonitoring(); |
| | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | Message message = |
| | | ERR_COMPUTING_FAKE_OPS.get(baseDn, rsInfo.getServerURL(), e |
| | | .getLocalizedMessage() |
| | | + stackTraceToSingleLineString(e)); |
| | | Message message = ERR_COMPUTING_FAKE_OPS.get( |
| | | baseDN.toNormalizedString(), rsInfo.getServerURL(), |
| | | e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } |
| | | finally |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("RB for dn " + baseDn + " and with server id " |
| | | TRACER.debugInfo("RB for dn " + baseDN + " and with server id " |
| | | + serverId + " computed " + nChanges + " changes late."); |
| | | } |
| | | |
| | |
| | | String port = server.substring(separator + 1); |
| | | String hostname = server.substring(0, separator); |
| | | |
| | | final String baseDn = this.baseDN.toNormalizedString(); |
| | | |
| | | Session localSession = null; |
| | | Socket socket = null; |
| | | boolean hasConnected = false; |
| | |
| | | |
| | | try |
| | | { |
| | | /* |
| | | * Open a socket connection to the next candidate. |
| | | */ |
| | | // Open a socket connection to the next candidate. |
| | | int intPort = Integer.parseInt(port); |
| | | InetSocketAddress serverAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), intPort); |
| | |
| | | StartMsg serverStartMsg; |
| | | if (!isECL) |
| | | { |
| | | serverStartMsg = new ServerStartMsg(serverId, url, baseDn, |
| | | maxRcvWindow, heartbeatInterval, state, |
| | | this.getGenerationID(), isSslEncryption, groupId); |
| | | serverStartMsg = new ServerStartMsg(serverId, url, |
| | | baseDN.toNormalizedString(), maxRcvWindow, heartbeatInterval, state, |
| | | getGenerationID(), isSslEncryption, groupId); |
| | | } |
| | | else |
| | | { |
| | | serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0, |
| | | maxRcvWindow, heartbeatInterval, state, |
| | | this.getGenerationID(), isSslEncryption, groupId); |
| | | getGenerationID(), isSslEncryption, groupId); |
| | | } |
| | | localSession.publish(serverStartMsg); |
| | | |
| | |
| | | ReplicationMsg msg = localSession.receive(); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n" |
| | | TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n" |
| | | + serverStartMsg + "\nAND RECEIVED:\n" + msg); |
| | | } |
| | | |
| | |
| | | |
| | | // Sanity check |
| | | String repDn = replServerInfo.getBaseDn(); |
| | | if (!this.baseDn.equals(repDn)) |
| | | if (!baseDn.equals(repDn)) |
| | | { |
| | | errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDn, |
| | | this.baseDn); |
| | | errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDn, baseDn); |
| | | return null; |
| | | } |
| | | |
| | |
| | | { |
| | | if (!hasConnected || !keepConnection) |
| | | { |
| | | if (localSession != null) |
| | | { |
| | | localSession.close(); |
| | | } |
| | | |
| | | if (socket != null) |
| | | { |
| | | try |
| | | { |
| | | socket.close(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | // Ignore. |
| | | } |
| | | } |
| | | close(localSession); |
| | | close(socket); |
| | | } |
| | | |
| | | if (!hasConnected && errorMessage != null) |
| | |
| | | * reply message from the replication server. |
| | | * |
| | | * @param server Server we are connecting with. |
| | | * @return The ReplServerStartMsg the server replied. Null if could not |
| | | * get an answer. |
| | | */ |
| | | private TopologyMsg performECLPhaseTwoHandshake(String server) |
| | | private void performECLPhaseTwoHandshake(String server) |
| | | { |
| | | TopologyMsg topologyMsg = null; |
| | | |
| | | try |
| | | { |
| | | // Send our Start Session |
| | |
| | | startECLSessionMsg.setOperationId("-1"); |
| | | session.publish(startECLSessionMsg); |
| | | |
| | | /* FIXME:ECL In the handshake phase two, should RS send back a topo msg ? |
| | | * Read the TopologyMsg that should come back. |
| | | topologyMsg = (TopologyMsg) session.receive(); |
| | | */ |
| | | // FIXME ECL In the handshake phase two, should RS send back a topo msg ? |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n" |
| | | TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n" |
| | | + startECLSessionMsg); |
| | | } |
| | | |
| | | // Alright set the timeout to the desired value |
| | | session.setSoTimeout(timeout); |
| | | connected = true; |
| | | |
| | | } catch (Exception e) |
| | | { |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId, |
| | | server, baseDn, stackTraceToSingleLineString(e)); |
| | | server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | |
| | | setSession(null); |
| | | |
| | | // Be sure to return null. |
| | | topologyMsg = null; |
| | | } |
| | | return topologyMsg; |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n" |
| | | TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n" |
| | | + startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg); |
| | | } |
| | | |
| | |
| | | } catch (Exception e) |
| | | { |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId, |
| | | server, baseDn, stackTraceToSingleLineString(e)); |
| | | server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | |
| | | setSession(null); |
| | |
| | | // Start a heartbeat monitor thread. |
| | | if (heartbeatInterval > 0) |
| | | { |
| | | heartbeatMonitor = new HeartbeatMonitor(getServerId(), |
| | | getRsServerId(), baseDn, session, heartbeatInterval); |
| | | heartbeatMonitor = new HeartbeatMonitor(getServerId(), getRsServerId(), |
| | | baseDN.toNormalizedString(), session, heartbeatInterval); |
| | | heartbeatMonitor.start(); |
| | | } |
| | | } |
| | |
| | | catch (Exception e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(baseDn, |
| | | e.getLocalizedMessage())); |
| | | mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get( |
| | | baseDN.toNormalizedString(), e.getLocalizedMessage())); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | } |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(this + " end restart : connected=" + connected |
| | | + " with RSid=" + this.getRsServerId() + " genid=" + this.generationID); |
| | | + " with RSid=" + getRsServerId() + " genid=" + this.generationID); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | else if (msg instanceof StopMsg) |
| | | { |
| | | /* |
| | | * RS performs a proper disconnection |
| | | */ |
| | | Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED |
| | | .get(replicationServerID, |
| | | savedSession.getReadableRemoteAddress(), |
| | | serverId, baseDn); |
| | | // RS performs a proper disconnection |
| | | Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get( |
| | | replicationServerID, savedSession.getReadableRemoteAddress(), |
| | | serverId, baseDN.toNormalizedString()); |
| | | logError(message); |
| | | |
| | | // Try to find a suitable RS |
| | | this.reStart(savedSession, true); |
| | | reStart(savedSession, true); |
| | | } |
| | | else if (msg instanceof MonitorMsg) |
| | | { |
| | |
| | | message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get( |
| | | serverId, replicationServerID, |
| | | savedSession.getReadableRemoteAddress(), |
| | | baseDn); |
| | | baseDN.toNormalizedString()); |
| | | } |
| | | else |
| | | { |
| | | message = NOTE_NEW_BEST_REPLICATION_SERVER.get( |
| | | serverId, replicationServerID, |
| | | savedSession.getReadableRemoteAddress(), |
| | | bestServerInfo.getServerId(), baseDn); |
| | | bestServerInfo.getServerId(), |
| | | baseDN.toNormalizedString()); |
| | | } |
| | | logError(message); |
| | | reStart(true); |
| | |
| | | final Session tmpSession = session; |
| | | if (tmpSession == null || !tmpSession.closeInitiated()) |
| | | { |
| | | /* |
| | | * We did not initiate the close on our side, log an error message. |
| | | */ |
| | | Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED |
| | | .get(serverId, baseDn, replicationServerID, |
| | | savedSession.getReadableRemoteAddress()); |
| | | // We did not initiate the close on our side, log an error message. |
| | | Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get( |
| | | serverId, baseDN.toNormalizedString(), replicationServerID, |
| | | savedSession.getReadableRemoteAddress()); |
| | | logError(message); |
| | | } |
| | | |
| | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("ReplicationBroker " + serverId + " is stopping and will" |
| | | + " close the connection to replication server " + rsServerId + " for" |
| | | + " domain " + baseDn); |
| | | + " domain " + baseDN); |
| | | |
| | | synchronized (startStopLock) |
| | | { |
| | |
| | | if (connected) |
| | | { |
| | | return sendWindow.availablePermits(); |
| | | } else |
| | | { |
| | | return 0; |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | |
| | | } catch (IOException ex) |
| | | { |
| | | Message message = ERR_EXCEPTION_SENDING_CS.get( |
| | | baseDn, |
| | | baseDN.toNormalizedString(), |
| | | Integer.toString(serverId), |
| | | ex.getLocalizedMessage() + stackTraceToSingleLineString(ex)); |
| | | ex.getLocalizedMessage() + " " + stackTraceToSingleLineString(ex)); |
| | | logError(message); |
| | | } |
| | | } |
| | |
| | | // Start a CSN heartbeat thread. |
| | | if (changeTimeHeartbeatSendInterval > 0) |
| | | { |
| | | String threadName = "Replica DS(" |
| | | + this.getServerId() |
| | | String threadName = "Replica DS(" + getServerId() |
| | | + ") change time heartbeat publisher for domain \"" |
| | | + this.baseDn + "\" to RS(" + this.getRsServerId() |
| | | + this.baseDN + "\" to RS(" + getRsServerId() |
| | | + ") at " + session.getReadableRemoteAddress(); |
| | | |
| | | ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread( |