| | |
| | | stopChangeTimeHeartBeatPublishing(); |
| | | mustRunBestServerCheckingAlgorithm = 0; |
| | | |
| | | boolean newServerWithSameGroupId = false; |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | /* |
| | |
| | | // Get info from every available replication servers |
| | | replicationServerInfos = collectReplicationServersInfo(); |
| | | |
| | | ReplicationServerInfo replicationServerInfo = null; |
| | | ReplicationServerInfo electedRsInfo = null; |
| | | |
| | | if (replicationServerInfos.size() > 0) |
| | | { |
| | | // At least one server answered, find the best one. |
| | | replicationServerInfo = computeBestReplicationServer(true, -1, state, |
| | | replicationServerInfos, serverId, baseDn, groupId, |
| | | this.getGenerationID()); |
| | | electedRsInfo = computeBestReplicationServer(true, -1, state, |
| | | replicationServerInfos, serverId, baseDn, groupId, getGenerationID()); |
| | | |
| | | // Best found, now initialize connection to this one (handshake phase 1) |
| | | if (debugEnabled()) |
| | | debugInfo("serverId: " + serverId + |
| | | " phase 2 : will perform PhaseOneH with the preferred RS=" |
| | | + replicationServerInfo); |
| | | replicationServerInfo = performPhaseOneHandshake( |
| | | replicationServerInfo.getServerURL(), true, false); |
| | | + electedRsInfo); |
| | | electedRsInfo = performPhaseOneHandshake( |
| | | electedRsInfo.getServerURL(), true, false); |
| | | |
| | | if (replicationServerInfo != null) |
| | | if (electedRsInfo != null) |
| | | { |
| | | // Update replication server info with potentially more up to date |
| | | // data (server state for instance may have changed) |
| | | replicationServerInfos.put(replicationServerInfo.getServerId(), |
| | | replicationServerInfo); |
| | | replicationServerInfos |
| | | .put(electedRsInfo.getServerId(), electedRsInfo); |
| | | |
| | | // Handshake phase 1 exchange went well |
| | | |
| | | // Compute in which status we are starting the session to tell the RS |
| | | ServerStatus initStatus = |
| | | computeInitialServerStatus(replicationServerInfo.getGenerationId(), |
| | | replicationServerInfo.getServerState(), |
| | | replicationServerInfo.getDegradedStatusThreshold(), |
| | | this.getGenerationID()); |
| | | computeInitialServerStatus(electedRsInfo.getGenerationId(), |
| | | electedRsInfo.getServerState(), |
| | | electedRsInfo.getDegradedStatusThreshold(), |
| | | getGenerationID()); |
| | | |
| | | // Perform session start (handshake phase 2) |
| | | TopologyMsg topologyMsg = performPhaseTwoHandshake( |
| | | replicationServerInfo.getServerURL(), initStatus); |
| | | electedRsInfo.getServerURL(), initStatus); |
| | | |
| | | if (topologyMsg != null) // Handshake phase 2 exchange went well |
| | | { |
| | | try |
| | | { |
| | | /* |
| | | * If we just connected to a RS with a different group id than us |
| | | * (because for instance a RS with our group id was unreachable |
| | | * while connecting to each RS) but the just received TopologyMsg |
| | | * shows that in the same time a RS with our group id connected, |
| | | * we must give up the connection to force reconnection that will |
| | | * certainly go back to a server with our group id as server with |
| | | * our group id have a greater priority for connection (in |
| | | * computeBestReplicationServer). In other words, we disconnect to |
| | | * connect to a server with our group id. If a server with our |
| | | * group id comes back later in the topology, we will be advised |
| | | * upon reception of a new TopologyMsg message and we will force |
| | | * reconnection at that time to retrieve a server with our group |
| | | * id. |
| | | */ |
| | | byte tmpRsGroupId = replicationServerInfo.getGroupId(); |
| | | boolean someServersWithSameGroupId = |
| | | hasSomeServerWithSameGroupId(topologyMsg.getRsList()); |
| | | |
| | | // Really no other server with our group id ? |
| | | if ((tmpRsGroupId == groupId) || (!someServersWithSameGroupId)) |
| | | { |
| | | replicationServer = session.getReadableRemoteAddress(); |
| | | maxSendWindow = replicationServerInfo.getWindowSize(); |
| | | rsGroupId = replicationServerInfo.getGroupId(); |
| | | rsServerId = replicationServerInfo.getServerId(); |
| | | rsServerUrl = replicationServerInfo.getServerURL(); |
| | | |
| | | receiveTopo(topologyMsg); |
| | | |
| | | // Log a message to let the administrator know that the failure |
| | | // was resolved. |
| | | // Wakeup all the thread that were waiting on the window |
| | | // on the previous connection. |
| | | connectionError = false; |
| | | if (sendWindow != null) |
| | | { |
| | | /* |
| | | * Fix (hack) for OPENDJ-401: we want to ensure that no |
| | | * threads holding this semaphore will get blocked when they |
| | | * acquire it. However, we also need to make sure that we |
| | | * don't overflow the semaphore by releasing too many permits. |
| | | */ |
| | | final int MAX_PERMITS = (Integer.MAX_VALUE >>> 2); |
| | | if (sendWindow.availablePermits() < MAX_PERMITS) |
| | | { |
| | | /* |
| | | * At least 2^29 acquisitions would need to occur for this |
| | | * to be insufficient. In addition, at least 2^30 releases |
| | | * would need to occur for this to potentially overflow. |
| | | * Hopefully this is unlikely to happen. |
| | | */ |
| | | sendWindow.release(MAX_PERMITS); |
| | | } |
| | | } |
| | | sendWindow = new Semaphore(maxSendWindow); |
| | | rcvWindow = maxRcvWindow; |
| | | connected = true; |
| | | |
| | | // May have created a broker with null replication domain for |
| | | // unit test purpose. |
| | | if (domain != null) |
| | | { |
| | | domain.sessionInitiated( |
| | | initStatus, replicationServerInfo.getServerState(), |
| | | replicationServerInfo.getGenerationId(), |
| | | session); |
| | | } |
| | | |
| | | if (getRsGroupId() != groupId) |
| | | { |
| | | // Connected to replication server with wrong group id: |
| | | // warn user and start poller to recover when a server with |
| | | // right group id arrives... |
| | | Message message = |
| | | WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get( |
| | | Byte.toString(groupId), Integer.toString(rsServerId), |
| | | replicationServerInfo.getServerURL(), |
| | | Byte.toString(getRsGroupId()), |
| | | baseDn, Integer.toString(serverId)); |
| | | logError(message); |
| | | } |
| | | startRSHeartBeatMonitoring(); |
| | | if (replicationServerInfo.getProtocolVersion() >= |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V3) |
| | | { |
| | | startChangeTimeHeartBeatPublishing(); |
| | | } |
| | | } else |
| | | { |
| | | // Detected new RS with our group id: log disconnection to |
| | | // inform administrator |
| | | Message message = NOTE_NEW_SERVER_WITH_SAME_GROUP_ID.get( |
| | | Byte.toString(groupId), baseDn.toString(), |
| | | Integer.toString(serverId)); |
| | | logError(message); |
| | | // Do not log connection error |
| | | newServerWithSameGroupId = true; |
| | | } |
| | | } catch (Exception e) |
| | | { |
| | | Message message = ERR_COMPUTING_FAKE_OPS.get( |
| | | baseDn, replicationServerInfo.getServerURL(), |
| | | e.getLocalizedMessage() + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } finally |
| | | { |
| | | if (connected == false) |
| | | { |
| | | ProtocolSession localSession = session; |
| | | if (localSession != null) |
| | | { |
| | | localSession.close(); |
| | | session = null; |
| | | } |
| | | } |
| | | } |
| | | connectToReplicationServer(electedRsInfo, initStatus, topologyMsg); |
| | | } // Could perform handshake phase 2 with best |
| | | |
| | | } // Could perform handshake phase 1 with best |
| | |
| | | { |
| | | connectPhaseLock.notify(); |
| | | |
| | | if ((replicationServerInfo.getGenerationId() == |
| | | this.getGenerationID()) || |
| | | (replicationServerInfo.getGenerationId() == -1)) |
| | | if ((electedRsInfo.getGenerationId() == getGenerationID()) |
| | | || (electedRsInfo.getGenerationId() == -1)) |
| | | { |
| | | Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG |
| | | .get(serverId, rsServerId, baseDn, |
| | |
| | | .get(serverId, rsServerId, baseDn, |
| | | session.getReadableRemoteAddress(), |
| | | getGenerationID(), |
| | | replicationServerInfo.getGenerationId()); |
| | | electedRsInfo.getGenerationId()); |
| | | logError(message); |
| | | } |
| | | } else |
| | |
| | | * This server could not find any replicationServer. It's going to start |
| | | * in degraded mode. Log a message. |
| | | */ |
| | | if (!connectionError && !newServerWithSameGroupId) |
| | | if (!connectionError) |
| | | { |
| | | connectionError = true; |
| | | connectPhaseLock.notify(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Has the passed RS info list some servers with our group id ? |
| | | * @return true if at least one server has the same group id |
| | | * Connects to a replication server. |
| | | * |
| | | * @param rsInfo |
| | | * the Replication Server to connect to |
| | | * @param initStatus |
| | | * The status to enter the state machine with |
| | | * @param topologyMsg |
| | | * the message containing the topology information |
| | | */ |
| | | private boolean hasSomeServerWithSameGroupId(List<RSInfo> rsInfos) |
| | | private void connectToReplicationServer(ReplicationServerInfo rsInfo, |
| | | ServerStatus initStatus, TopologyMsg topologyMsg) |
| | | { |
| | | for (RSInfo rsInfo : rsInfos) |
| | | try |
| | | { |
| | | if (rsInfo.getGroupId() == this.groupId) |
| | | return true; |
| | | replicationServer = session.getReadableRemoteAddress(); |
| | | maxSendWindow = rsInfo.getWindowSize(); |
| | | rsGroupId = rsInfo.getGroupId(); |
| | | rsServerId = rsInfo.getServerId(); |
| | | rsServerUrl = rsInfo.getServerURL(); |
| | | |
| | | receiveTopo(topologyMsg); |
| | | |
| | | // Log a message to let the administrator know that the failure |
| | | // was resolved. |
| | | // Wakeup all the thread that were waiting on the window |
| | | // on the previous connection. |
| | | connectionError = false; |
| | | if (sendWindow != null) |
| | | { |
| | | /* |
| | | * Fix (hack) for OPENDJ-401: we want to ensure that no threads holding |
| | | * this semaphore will get blocked when they acquire it. However, we |
| | | * also need to make sure that we don't overflow the semaphore by |
| | | * releasing too many permits. |
| | | */ |
| | | final int MAX_PERMITS = (Integer.MAX_VALUE >>> 2); |
| | | if (sendWindow.availablePermits() < MAX_PERMITS) |
| | | { |
| | | /* |
| | | * At least 2^29 acquisitions would need to occur for this to be |
| | | * insufficient. In addition, at least 2^30 releases would need to |
| | | * occur for this to potentially overflow. Hopefully this is unlikely |
| | | * to happen. |
| | | */ |
| | | sendWindow.release(MAX_PERMITS); |
| | | } |
| | | } |
| | | sendWindow = new Semaphore(maxSendWindow); |
| | | rcvWindow = maxRcvWindow; |
| | | connected = true; |
| | | |
| | | // May have created a broker with null replication domain for |
| | | // unit test purpose. |
| | | if (domain != null) |
| | | { |
| | | domain.sessionInitiated(initStatus, rsInfo.getServerState(), rsInfo |
| | | .getGenerationId(), session); |
| | | } |
| | | |
| | | if (getRsGroupId() != groupId) |
| | | { |
| | | // Connected to replication server with wrong group id: |
| | | // warn user and start poller to recover when a server with |
| | | // right group id arrives... |
| | | Message message = |
| | | WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(Byte |
| | | .toString(groupId), Integer.toString(rsServerId), rsInfo |
| | | .getServerURL(), Byte.toString(getRsGroupId()), baseDn, Integer |
| | | .toString(serverId)); |
| | | logError(message); |
| | | } |
| | | startRSHeartBeatMonitoring(); |
| | | if (rsInfo.getProtocolVersion() >= |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V3) |
| | | { |
| | | startChangeTimeHeartBeatPublishing(); |
| | | } |
| | | } |
| | | return false; |
| | | catch (Exception e) |
| | | { |
| | | Message message = |
| | | ERR_COMPUTING_FAKE_OPS.get(baseDn, rsInfo.getServerURL(), e |
| | | .getLocalizedMessage() |
| | | + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } |
| | | finally |
| | | { |
| | | if (connected == false) |
| | | { |
| | | ProtocolSession localSession = session; |
| | | if (localSession != null) |
| | | { |
| | | localSession.close(); |
| | | session = null; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |