| | |
| | | import java.net.Socket; |
| | | import java.net.SocketException; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.protocols.asn1.ASN1OctetString; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.internal.InternalSearchListener; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DereferencePolicy; |
| | |
| | | private ProtocolSession session = null; |
| | | private final ServerState state; |
| | | private final DN baseDn; |
| | | private final short serverID; |
| | | private final short serverId; |
| | | private int maxSendDelay; |
| | | private int maxReceiveDelay; |
| | | private int maxSendQueue; |
| | |
| | | private short protocolVersion; |
| | | private long generationId = -1; |
| | | private ReplSessionSecurity replSessionSecurity; |
| | | // My group id |
| | | private byte groupId = (byte) -1; |
| | | // The group id of the RS we are connected to |
| | | private byte rsGroupId = (byte) -1; |
| | | // The server id of the RS we are connected to |
| | | private short rsServerId = -1; |
| | | // The server URL of the RS we are connected to |
| | | private String rsServerUrl = null; |
| | | // Our replication domain |
| | | private ReplicationDomain replicationDomain = null; |
| | | |
| | | // Trick for avoiding a inner class for many parameters return for |
| | | // performHandshake method. |
| | | // performPhaseOneHandshake method. |
| | | private String tmpReadableServerName = null; |
| | | /** |
| | | * The time in milliseconds between heartbeats from the replication |
| | |
| | | private boolean connectionError = false; |
| | | private final Object connectPhaseLock = new Object(); |
| | | |
| | | // Same group id poller thread |
| | | private SameGroupIdPoller sameGroupIdPoller = null; |
| | | |
| | | /** |
| | | * Creates a new ReplicationServer Broker for a particular ReplicationDomain. |
| | | * |
| | | * @param replicationDomain The replication domain that is creating us. |
| | | * @param state The ServerState that should be used by this broker |
| | | * when negociating the session with the replicationServer. |
| | | * when negotiating the session with the replicationServer. |
| | | * @param baseDn The base DN that should be used by this broker |
| | | * when negociating the session with the replicationServer. |
| | | * @param serverID The server ID that should be used by this broker |
| | | * when negociating the session with the replicationServer. |
| | | * when negotiating the session with the replicationServer. |
| | | * @param serverId The server ID that should be used by this broker |
| | | * when negotiating the session with the replicationServer. |
| | | * @param maxReceiveQueue The maximum size of the receive queue to use on |
| | | * the replicationServer. |
| | | * @param maxReceiveDelay The maximum replication delay to use on the |
| | |
| | | * replicationServer, or zero if no heartbeats are requested. |
| | | * |
| | | * @param generationId The generationId for the server associated to the |
| | | * provided serverID and for the domain associated to the provided baseDN. |
| | | * provided serverId and for the domain associated to the provided baseDN. |
| | | * @param replSessionSecurity The session security configuration. |
| | | * @param groupId The group id of our domain. |
| | | */ |
| | | public ReplicationBroker(ServerState state, DN baseDn, short serverID, |
| | | int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, |
| | | int maxSendDelay, int window, long heartbeatInterval, |
| | | long generationId, ReplSessionSecurity replSessionSecurity) |
| | | public ReplicationBroker(ReplicationDomain replicationDomain, |
| | | ServerState state, DN baseDn, short serverId, int maxReceiveQueue, |
| | | int maxReceiveDelay, int maxSendQueue, int maxSendDelay, int window, |
| | | long heartbeatInterval, long generationId, |
| | | ReplSessionSecurity replSessionSecurity, byte groupId) |
| | | { |
| | | this.replicationDomain = replicationDomain; |
| | | this.baseDn = baseDn; |
| | | this.serverID = serverID; |
| | | this.serverId = serverId; |
| | | this.maxReceiveDelay = maxReceiveDelay; |
| | | this.maxSendDelay = maxSendDelay; |
| | | this.maxReceiveQueue = maxReceiveQueue; |
| | |
| | | this.maxRcvWindow = window; |
| | | this.halfRcvWindow = window / 2; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.protocolVersion = ProtocolVersion.currentVersion(); |
| | | this.protocolVersion = ProtocolVersion.getCurrentVersion(); |
| | | this.generationId = generationId; |
| | | this.replSessionSecurity = replSessionSecurity; |
| | | this.groupId = groupId; |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Gets the group id of the RS we are connected to. |
| | | * @return The group id of the RS we are connected to |
| | | */ |
| | | public byte getRsGroupId() |
| | | { |
| | | return rsGroupId; |
| | | } |
| | | |
| | | /** |
| | | * Gets the server id of the RS we are connected to. |
| | | * @return The server id of the RS we are connected to |
| | | */ |
| | | public short getRsServerId() |
| | | { |
| | | return rsServerId; |
| | | } |
| | | |
| | | /** |
| | | * Gets the server id. |
| | | * @return The server id |
| | | */ |
| | | public short getServerId() |
| | | { |
| | | return serverId; |
| | | } |
| | | |
| | | /** |
| | | * Gets the server url of the RS we are connected to. |
| | | * @return The server url of the RS we are connected to |
| | | */ |
| | | public String getRsServerUrl() |
| | | { |
| | | return rsServerUrl; |
| | | } |
| | | |
| | | /** |
| | | * Bag class for keeping info we get from a server in order to compute the |
| | | * best one to connect to. |
| | | */ |
| | | public static class ServerInfo |
| | | { |
| | | |
| | | private ServerState serverState = null; |
| | | private byte groupId = (byte) -1; |
| | | |
| | | /** |
| | | * Constructor. |
| | | * @param serverState Server state of the RS |
| | | * @param groupId Group id of the RS |
| | | */ |
| | | public ServerInfo(ServerState serverState, byte groupId) |
| | | { |
| | | this.serverState = serverState; |
| | | this.groupId = groupId; |
| | | } |
| | | |
| | | /** |
| | | * Get the server state. |
| | | * @return The server state |
| | | */ |
| | | public ServerState getServerState() |
| | | { |
| | | return serverState; |
| | | } |
| | | |
| | | /** |
| | | * get the group id. |
| | | * @return The group id |
| | | */ |
| | | public byte getGroupId() |
| | | { |
| | | return groupId; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Connect to a ReplicationServer. |
| | | * |
| | | * Handshake sequences between a DS and a RS is divided into 2 logical |
| | | * consecutive phases (phase 1 and phase 2). DS always initiates connection |
| | | * and always sends first message: |
| | | * |
| | | * DS<->RS: |
| | | * ------- |
| | | * |
| | | * phase 1: |
| | | * DS --- ServerStartMsg ---> RS |
| | | * DS <--- ReplServerStartMsg --- RS |
| | | * phase 2: |
| | | * DS --- StartSessionMsg ---> RS |
| | | * DS <--- TopologyMsg --- RS |
| | | * |
| | | * Before performing a full handshake sequence, DS searches for best suitable |
| | | * RS by making only phase 1 handshake to every RS he knows then closing |
| | | * connection. This allows to gather information on available RSs and then |
| | | * decide with which RS the full handshake (phase 1 then phase 2) will be |
| | | * finally performed. |
| | | * |
| | | * @throws NumberFormatException address was invalid |
| | | */ |
| | | private void connect() |
| | | { |
| | | HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); |
| | | HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>(); |
| | | |
| | | // Stop any existing heartbeat monitor from a previous session. |
| | | // May have created a broker with null replication domain for |
| | | // unit test purpose. |
| | | if (replicationDomain != null) |
| | | { |
| | | // If a first connect or a connection failure occur, we go through here. |
| | | // force status machine to NOT_CONNECTED_STATUS so that monitoring can |
| | | // see that we are not connected. |
| | | replicationDomain.toNotConnectedStatus(); |
| | | } |
| | | |
| | | // Stop any existing poller and heartbeat monitor from a previous session. |
| | | stopSameGroupIdPoller(); |
| | | stopHeartBeat(); |
| | | |
| | | boolean newServerWithSameGroupId = false; |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | /* |
| | |
| | | for (String server : servers) |
| | | { |
| | | // Connect to server and get reply message |
| | | ReplServerStartMessage replServerStartMsg = |
| | | performHandshake(server, false); |
| | | tmpReadableServerName = null; // Not needed now |
| | | ReplServerStartMsg replServerStartMsg = |
| | | performPhaseOneHandshake(server, false); |
| | | |
| | | // Store reply message in list |
| | | // Store reply message info in list |
| | | if (replServerStartMsg != null) |
| | | { |
| | | ServerState rsState = replServerStartMsg.getServerState(); |
| | | rsStates.put(server, rsState); |
| | | ServerInfo serverInfo = |
| | | new ServerInfo(replServerStartMsg.getServerState(), |
| | | replServerStartMsg.getGroupId()); |
| | | rsInfos.put(server, serverInfo); |
| | | } |
| | | } // for servers |
| | | |
| | | ReplServerStartMessage replServerStartMsg = null; |
| | | ReplServerStartMsg replServerStartMsg = null; |
| | | |
| | | if (rsStates.size() > 0) |
| | | if (rsInfos.size() > 0) |
| | | { |
| | | |
| | | // At least one server answered, find the best one. |
| | | String bestServer = computeBestReplicationServer(state, rsStates, |
| | | serverID, baseDn); |
| | | String bestServer = computeBestReplicationServer(state, rsInfos, |
| | | serverId, baseDn, groupId); |
| | | |
| | | // Best found, now connect to this one |
| | | replServerStartMsg = performHandshake(bestServer, true); |
| | | // Best found, now initialize connection to this one (handshake phase 1) |
| | | replServerStartMsg = performPhaseOneHandshake(bestServer, true); |
| | | |
| | | if (replServerStartMsg != null) |
| | | if (replServerStartMsg != null) // Handshake phase 1 exchange went well |
| | | |
| | | { |
| | | try |
| | | ServerInfo bestServerInfo = rsInfos.get(bestServer); |
| | | |
| | | // Compute in which status we are starting the session to tell the RS |
| | | ServerStatus initStatus = |
| | | computeInitialServerStatus(replServerStartMsg.getGenerationId(), |
| | | bestServerInfo.getServerState(), |
| | | replServerStartMsg.getDegradedStatusThreshold(), generationId); |
| | | |
| | | // Perfom session start (handshake phase 2) |
| | | TopologyMsg topologyMsg = performPhaseTwoHandshake(bestServer, |
| | | initStatus); |
| | | |
| | | if (topologyMsg != null) // Handshake phase 2 exchange went well |
| | | |
| | | { |
| | | /* |
| | | * We must not publish changes to a replicationServer that has not |
| | | * seen all our previous changes because this could cause some |
| | | * other ldap servers to miss those changes. |
| | | * Check that the ReplicationServer has seen all our previous |
| | | * changes. |
| | | */ |
| | | ChangeNumber replServerMaxChangeNumber = |
| | | replServerStartMsg.getServerState().getMaxChangeNumber(serverID); |
| | | |
| | | if (replServerMaxChangeNumber == null) |
| | | try |
| | | { |
| | | replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID); |
| | | } |
| | | ChangeNumber ourMaxChangeNumber = |
| | | state.getMaxChangeNumber(serverID); |
| | | |
| | | if ((ourMaxChangeNumber != null) && |
| | | (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) |
| | | { |
| | | |
| | | // Replication server is missing some of our changes: let's send |
| | | // them to him. |
| | | replayOperations.clear(); |
| | | |
| | | Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get(); |
| | | logError(message); |
| | | |
| | | /* |
| | | * Get all the changes that have not been seen by this |
| | | * replication server and populate the replayOperations |
| | | * list. |
| | | * If we just connected to a RS with a different group id than us |
| | | * (because for instance a RS with our group id was unreachable |
| | | * while connecting to each RS) but the just received TopologyMsg |
| | | * shows that in the same time a RS with our group id connected, |
| | | * we must give up the connection to force reconnection that will |
| | | * certainly go back to a server with our group id as server with |
| | | * our group id have a greater priority for connection (in |
| | | * computeBestReplicationServer). In other words, we disconnect to |
| | | * connect to a server with our group id. If a server with our |
| | | * group id comes back later in the topology, we will be advised |
| | | * upon reception of a new TopologyMsg message and we will force |
| | | * reconnection at that time to retrieve a server with our group |
| | | * id. |
| | | */ |
| | | InternalSearchOperation op = searchForChangedEntries( |
| | | baseDn, replServerMaxChangeNumber, this); |
| | | if (op.getResultCode() != ResultCode.SUCCESS) |
| | | byte tmpRsGroupId = bestServerInfo.getGroupId(); |
| | | boolean someServersWithSameGroupId = |
| | | hasSomeServerWithSameGroupId(topologyMsg.getRsList()); |
| | | |
| | | // Really no other server with our group id ? |
| | | if ((tmpRsGroupId == groupId) || |
| | | ((tmpRsGroupId != groupId) && !someServersWithSameGroupId)) |
| | | { |
| | | /* |
| | | * An error happened trying to search for the updates |
| | | * This server will start acepting again new updates but |
| | | * some inconsistencies will stay between servers. |
| | | * Log an error for the repair tool |
| | | * that will need to resynchronize the servers. |
| | | * We must not publish changes to a replicationServer that has |
| | | * not seen all our previous changes because this could cause |
| | | * some other ldap servers to miss those changes. |
| | | * Check that the ReplicationServer has seen all our previous |
| | | * changes. |
| | | */ |
| | | message = ERR_CANNOT_RECOVER_CHANGES.get( |
| | | baseDn.toNormalizedString()); |
| | | logError(message); |
| | | ChangeNumber replServerMaxChangeNumber = |
| | | replServerStartMsg.getServerState(). |
| | | getMaxChangeNumber(serverId); |
| | | |
| | | if (replServerMaxChangeNumber == null) |
| | | { |
| | | replServerMaxChangeNumber = new ChangeNumber(0, 0, serverId); |
| | | } |
| | | ChangeNumber ourMaxChangeNumber = |
| | | state.getMaxChangeNumber(serverId); |
| | | |
| | | if ((ourMaxChangeNumber != null) && |
| | | (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) |
| | | { |
| | | |
| | | // Replication server is missing some of our changes: let's |
| | | // send them to him. |
| | | |
| | | Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get(); |
| | | logError(message); |
| | | |
| | | /* |
| | | * Get all the changes that have not been seen by this |
| | | * replication server and populate the replayOperations |
| | | * list. |
| | | */ |
| | | InternalSearchOperation op = searchForChangedEntries( |
| | | baseDn, replServerMaxChangeNumber, this); |
| | | if (op.getResultCode() != ResultCode.SUCCESS) |
| | | { |
| | | /* |
| | | * An error happened trying to search for the updates |
| | | * This server will start acepting again new updates but |
| | | * some inconsistencies will stay between servers. |
| | | * Log an error for the repair tool |
| | | * that will need to resynchronize the servers. |
| | | */ |
| | | message = ERR_CANNOT_RECOVER_CHANGES.get( |
| | | baseDn.toNormalizedString()); |
| | | logError(message); |
| | | } else |
| | | { |
| | | for (FakeOperation replayOp : replayOperations) |
| | | { |
| | | ChangeNumber cn = replayOp.getChangeNumber(); |
| | | /* |
| | | * Because the entry returned by the search operation |
| | | * can contain old historical information, it is |
| | | * possible that some of the FakeOperation are |
| | | * actually older than the |
| | | * Only send the Operation if it was newer than |
| | | * the last ChangeNumber known by the Replication Server. |
| | | */ |
| | | if (cn.newer(replServerMaxChangeNumber)) |
| | | { |
| | | message = |
| | | DEBUG_SENDING_CHANGE.get( |
| | | replayOp.getChangeNumber().toString()); |
| | | logError(message); |
| | | session.publish(replayOp.generateMessage()); |
| | | } |
| | | } |
| | | message = DEBUG_CHANGES_SENT.get(); |
| | | logError(message); |
| | | } |
| | | replayOperations.clear(); |
| | | } |
| | | |
| | | replicationServer = tmpReadableServerName; |
| | | maxSendWindow = replServerStartMsg.getWindowSize(); |
| | | rsGroupId = replServerStartMsg.getGroupId(); |
| | | rsServerId = replServerStartMsg.getServerId(); |
| | | rsServerUrl = bestServer; |
| | | |
| | | // May have created a broker with null replication domain for |
| | | // unit test purpose. |
| | | if (replicationDomain != null) |
| | | { |
| | | replicationDomain.setInitialStatus(initStatus); |
| | | replicationDomain.receiveTopo(topologyMsg); |
| | | } |
| | | connected = true; |
| | | if (getRsGroupId() != groupId) |
| | | { |
| | | // Connected to replication server with wrong group id: |
| | | // warn user and start poller to recover when a server with |
| | | // right group id arrives... |
| | | Message message = |
| | | WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get( |
| | | Byte.toString(groupId), Short.toString(rsServerId), |
| | | bestServer, Byte.toString(getRsGroupId()), |
| | | baseDn.toString(), Short.toString(serverId)); |
| | | logError(message); |
| | | startSameGroupIdPoller(); |
| | | } |
| | | startHeartBeat(); |
| | | } else |
| | | { |
| | | for (FakeOperation replayOp : replayOperations) |
| | | { |
| | | message = DEBUG_SENDING_CHANGE.get(replayOp.getChangeNumber(). |
| | | toString()); |
| | | logError(message); |
| | | session.publish(replayOp.generateMessage()); |
| | | } |
| | | message = DEBUG_CHANGES_SENT.get(); |
| | | // Detected new RS with our group id: log disconnection to |
| | | // inform administrator |
| | | Message message = NOTE_NEW_SERVER_WITH_SAME_GROUP_ID.get( |
| | | Byte.toString(groupId), baseDn.toString(), |
| | | Short.toString(serverId)); |
| | | logError(message); |
| | | // Do not log connection error |
| | | newServerWithSameGroupId = true; |
| | | } |
| | | replayOperations.clear(); |
| | | } |
| | | |
| | | replicationServer = tmpReadableServerName; |
| | | maxSendWindow = replServerStartMsg.getWindowSize(); |
| | | connected = true; |
| | | startHeartBeat(); |
| | | } catch (IOException e) |
| | | { |
| | | Message message = ERR_PUBLISHING_FAKE_OPS.get( |
| | | baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } catch (Exception e) |
| | | { |
| | | Message message = ERR_COMPUTING_FAKE_OPS.get( |
| | | baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } finally |
| | | { |
| | | if (connected == false) |
| | | } catch (IOException e) |
| | | { |
| | | if (session != null) |
| | | Message message = ERR_PUBLISHING_FAKE_OPS.get( |
| | | baseDn.toNormalizedString(), bestServer, |
| | | e.getLocalizedMessage() + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } catch (Exception e) |
| | | { |
| | | Message message = ERR_COMPUTING_FAKE_OPS.get( |
| | | baseDn.toNormalizedString(), bestServer, |
| | | e.getLocalizedMessage() + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } finally |
| | | { |
| | | if (connected == false) |
| | | { |
| | | try |
| | | if (session != null) |
| | | { |
| | | session.close(); |
| | | } catch (IOException e) |
| | | { |
| | | // The session was already closed, just ignore. |
| | | try |
| | | { |
| | | session.close(); |
| | | } catch (IOException e) |
| | | { |
| | | // The session was already closed, just ignore. |
| | | } |
| | | session = null; |
| | | } |
| | | session = null; |
| | | } |
| | | } |
| | | } |
| | | } // Could perform handshake with best |
| | | } // Could perform handshake phase 2 with best |
| | | |
| | | } // Could perform handshake phase 1 with best |
| | | |
| | | } // Reached some servers |
| | | |
| | | if (connected) |
| | |
| | | Message message = |
| | | NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get( |
| | | baseDn.toString(), |
| | | Short.toString(rsServerId), |
| | | replicationServer, |
| | | Short.toString(serverId), |
| | | Long.toString(this.generationId)); |
| | | logError(message); |
| | | } else |
| | |
| | | * This server could not find any replicationServer. It's going to start |
| | | * in degraded mode. Log a message. |
| | | */ |
| | | if (!connectionError) |
| | | if (!connectionError && !newServerWithSameGroupId) |
| | | { |
| | | connectionError = true; |
| | | connectPhaseLock.notify(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Connect to the provided server performing the handshake (start messages |
| | | * exchange) and return the reply message from the replication server. |
| | | * Has the passed RS info list some servers with our group id ? |
| | | * @return true if at least one server has the same group id |
| | | */ |
| | | private boolean hasSomeServerWithSameGroupId(List<RSInfo> rsInfos) |
| | | { |
| | | for (RSInfo rsInfo : rsInfos) |
| | | { |
| | | if (rsInfo.getGroupId() == this.groupId) |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | /** |
| | | * Determines the status we are starting with according to our state and the |
| | | * RS state. |
| | | * |
| | | * @param rsGenId The generation id of the RS |
| | | * @param rsState The server state of the RS |
| | | * @param degradedStatusThreshold The degraded status threshold of the RS |
| | | * @param dsGenId The local generation id |
| | | * @return The initial status |
| | | */ |
| | | public ServerStatus computeInitialServerStatus(long rsGenId, |
| | | ServerState rsState, int degradedStatusThreshold, long dsGenId) |
| | | { |
| | | if (rsGenId == -1) |
| | | { |
| | | // RS has no generation id |
| | | return ServerStatus.NORMAL_STATUS; |
| | | } else |
| | | { |
| | | if (rsGenId == dsGenId) |
| | | { |
| | | // DS and RS have same generation id |
| | | |
| | | // Determine if we are late or not to replay changes. RS uses a |
| | | // threshold value for pending changes to be replayed by a DS to |
| | | // determine if the DS is in normal status or in degraded status. |
| | | // Let's compare the local and remote server state using this threshold |
| | | // value to determine if we are late or not |
| | | |
| | | ServerStatus initStatus = ServerStatus.INVALID_STATUS; |
| | | int nChanges = ServerState.diffChanges(rsState, state); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("RB for dn " + baseDn.toNormalizedString() + |
| | | " and with server id " + Short.toString(serverId) + " computed " + |
| | | Integer.toString(nChanges) + " changes late."); |
| | | } |
| | | |
| | | // Check status to know if it is relevant to change the status. Do not |
| | | // take RSD lock to test. If we attempt to change the status whereas |
| | | // we are in a status that do not allows that, this will be noticed by |
| | | // the changeStatusFromStatusAnalyzer method. This allows to take the |
| | | // lock roughly only when needed versus every sleep time timeout. |
| | | if (degradedStatusThreshold > 0) |
| | | { |
| | | if (nChanges >= degradedStatusThreshold) |
| | | { |
| | | initStatus = ServerStatus.DEGRADED_STATUS; |
| | | } else |
| | | { |
| | | initStatus = ServerStatus.NORMAL_STATUS; |
| | | } |
| | | } else |
| | | { |
| | | // 0 threshold value means no degrading system used (no threshold): |
| | | // force normal status |
| | | initStatus = ServerStatus.NORMAL_STATUS; |
| | | } |
| | | |
| | | return initStatus; |
| | | } else |
| | | { |
| | | // DS and RS do not have same generation id |
| | | return ServerStatus.BAD_GEN_ID_STATUS; |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Connect to the provided server performing the first phase handshake |
| | | * (start messages exchange) and return the reply message from the replication |
| | | * server. |
| | | * |
| | | * @param server Server to connect to. |
| | | * @param keepConnection Do we keep session opened or not after handshake. |
| | | * @return The ReplServerStartMessage the server replied. Null if could not |
| | | * Use true if want to perform handshake phase 2 with the same session |
| | | * and keep the session to create as the current one. |
| | | * @return The ReplServerStartMsg the server replied. Null if could not |
| | | * get an answer. |
| | | */ |
| | | public ReplServerStartMessage performHandshake(String server, |
| | | private ReplServerStartMsg performPhaseOneHandshake(String server, |
| | | boolean keepConnection) |
| | | { |
| | | ReplServerStartMessage replServerStartMsg = null; |
| | | ReplServerStartMsg replServerStartMsg = null; |
| | | |
| | | // Parse server string. |
| | | int separator = server.lastIndexOf(':'); |
| | | String port = server.substring(separator + 1); |
| | | String hostname = server.substring(0, separator); |
| | | ProtocolSession localSession = null; |
| | | |
| | | boolean error = false; |
| | | try |
| | |
| | | int intPort = Integer.parseInt(port); |
| | | InetSocketAddress serverAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), intPort); |
| | | tmpReadableServerName = serverAddr.toString(); |
| | | if (keepConnection) |
| | | tmpReadableServerName = serverAddr.toString(); |
| | | Socket socket = new Socket(); |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.setTcpNoDelay(true); |
| | | socket.connect(serverAddr, 500); |
| | | session = replSessionSecurity.createClientSession(server, socket); |
| | | localSession = replSessionSecurity.createClientSession(server, socket, |
| | | ReplSessionSecurity.HANDSHAKE_TIMEOUT); |
| | | boolean isSslEncryption = |
| | | replSessionSecurity.isSslEncryption(server); |
| | | /* |
| | | * Send our ServerStartMessage. |
| | | * Send our ServerStartMsg. |
| | | */ |
| | | ServerStartMessage msg = new ServerStartMessage(serverID, baseDn, |
| | | ServerStartMsg serverStartMsg = new ServerStartMsg(serverId, baseDn, |
| | | maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, |
| | | halfRcvWindow * 2, heartbeatInterval, state, |
| | | protocolVersion, generationId, isSslEncryption, !keepConnection); |
| | | session.publish(msg); |
| | | ProtocolVersion.getCurrentVersion(), generationId, isSslEncryption, |
| | | groupId); |
| | | localSession.publish(serverStartMsg); |
| | | |
| | | /* |
| | | * Read the ReplServerStartMessage that should come back. |
| | | * Read the ReplServerStartMsg that should come back. |
| | | */ |
| | | session.setSoTimeout(1000); |
| | | replServerStartMsg = (ReplServerStartMessage) session.receive(); |
| | | replServerStartMsg = (ReplServerStartMsg) localSession.receive(); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDn + |
| | | "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() + |
| | | "\nAND RECEIVED:\n" + replServerStartMsg.toString()); |
| | | } |
| | | |
| | | // Sanity check |
| | | DN repDn = replServerStartMsg.getBaseDn(); |
| | | if (!(this.baseDn.equals(repDn))) |
| | | { |
| | | Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(), |
| | | this.baseDn.toString()); |
| | | logError(message); |
| | | error = true; |
| | | } |
| | | |
| | | /* |
| | | * We have sent our own protocol version to the replication server. |
| | | * The replication server will use the same one (or an older one |
| | | * if it is an old replication server). |
| | | */ |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | replServerStartMsg.getVersion()); |
| | | session.setSoTimeout(timeout); |
| | | if (keepConnection) |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | replServerStartMsg.getVersion()); |
| | | |
| | | if (!isSslEncryption) |
| | | { |
| | | session.stopEncryption(); |
| | | localSession.stopEncryption(); |
| | | } |
| | | } catch (ConnectException e) |
| | | { |
| | |
| | | */ |
| | | if (!connectionError) |
| | | { |
| | | // the error message is only logged once to avoid overflowing |
| | | // the error log |
| | | Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server); |
| | | logError(message); |
| | | if (keepConnection) // Log error message only for final connection |
| | | { |
| | | // the error message is only logged once to avoid overflowing |
| | | // the error log |
| | | logError(message); |
| | | } else if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(message.toString()); |
| | | } |
| | | } |
| | | error = true; |
| | | } catch (Exception e) |
| | | { |
| | | Message message = ERR_EXCEPTION_STARTING_SESSION.get( |
| | | if ( (e instanceof SocketTimeoutException) && debugEnabled() ) |
| | | { |
| | | TRACER.debugInfo("Timeout trying to connect to RS " + server + |
| | | " for dn: " + baseDn.toNormalizedString()); |
| | | } |
| | | Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1", |
| | | baseDn.toNormalizedString(), server, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | if (keepConnection) // Log error message only for final connection |
| | | { |
| | | logError(message); |
| | | } else if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(message.toString()); |
| | | } |
| | | error = true; |
| | | } |
| | | |
| | | // Close session if requested |
| | | if (!keepConnection || error) |
| | | { |
| | | if (session != null) |
| | | if (localSession != null) |
| | | { |
| | | try |
| | | { |
| | | session.close(); |
| | | localSession.close(); |
| | | } catch (IOException e) |
| | | { |
| | | // The session was already closed, just ignore. |
| | | // The session was already closed, just ignore. |
| | | } |
| | | session = null; |
| | | localSession = null; |
| | | } |
| | | if (error) |
| | | { |
| | | replServerStartMsg = null; |
| | | } // Be sure to return null. |
| | | |
| | | } |
| | | |
| | | // If this connection as the one to use for sending and receiving updates, |
| | | // store it. |
| | | if (keepConnection) |
| | | { |
| | | session = localSession; |
| | | } |
| | | |
| | | return replServerStartMsg; |
| | | } |
| | | |
| | | /** |
| | | * Performs the second phase handshake (send StartSessionMsg and receive |
| | | * TopologyMsg messages exchange) and return the reply message from the |
| | | * replication server. |
| | | * |
| | | * @param server Server we are connecting with. |
| | | * @param initStatus The status we are starting with |
| | | * @return The ReplServerStartMsg the server replied. Null if could not |
| | | * get an answer. |
| | | */ |
| | | private TopologyMsg performPhaseTwoHandshake(String server, |
| | | ServerStatus initStatus) |
| | | { |
| | | TopologyMsg topologyMsg = null; |
| | | |
| | | try |
| | | { |
| | | /* |
| | | * Send our StartSessionMsg. |
| | | */ |
| | | StartSessionMsg startSessionMsg = null; |
| | | // May have created a broker with null replication domain for |
| | | // unit test purpose. |
| | | if (replicationDomain != null) |
| | | { |
| | | startSessionMsg = new StartSessionMsg(initStatus, |
| | | replicationDomain.getRefUrls(), |
| | | replicationDomain.isAssured(), |
| | | replicationDomain.getAssuredMode(), |
| | | replicationDomain.getAssuredSdLevel()); |
| | | } else |
| | | { |
| | | startSessionMsg = |
| | | new StartSessionMsg(initStatus, new ArrayList<String>()); |
| | | } |
| | | session.publish(startSessionMsg); |
| | | |
| | | /* |
| | | * Read the TopologyMsg that should come back. |
| | | */ |
| | | topologyMsg = (TopologyMsg) session.receive(); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDn + |
| | | "\nRB HANDSHAKE SENT:\n" + startSessionMsg.toString() + |
| | | "\nAND RECEIVED:\n" + topologyMsg.toString()); |
| | | } |
| | | |
| | | // Alright set the timeout to the desired value |
| | | session.setSoTimeout(timeout); |
| | | |
| | | } catch (Exception e) |
| | | { |
| | | Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2", |
| | | baseDn.toNormalizedString(), server, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | |
| | | if (session != null) |
| | | { |
| | | try |
| | | { |
| | | session.close(); |
| | | } catch (IOException ex) |
| | | { |
| | | // The session was already closed, just ignore. |
| | | } |
| | | session = null; |
| | | } |
| | | // Be sure to return null. |
| | | topologyMsg = null; |
| | | } |
| | | return topologyMsg; |
| | | } |
| | | |
| | | /** |
| | | * Returns the replication server that best fits our need so that we can |
| | | * connect to it. |
| | | * This methods performs some filtering on the group id, then call |
| | | * the real search for best server algorithm (searchForBestReplicationServer). |
| | | * |
| | | * Note: this method put as public static for unit testing purpose. |
| | | * |
| | | * @param myState The local server state. |
| | | * @param rsInfos The list of available replication servers and their |
| | | * associated information (choice will be made among them). |
| | | * @param serverId The server id for the suffix we are working for. |
| | | * @param baseDn The suffix for which we are working for. |
| | | * @param groupId The groupId we prefer being connected to if possible |
| | | * @return The computed best replication server. |
| | | */ |
| | | public static String computeBestReplicationServer(ServerState myState, |
| | | HashMap<String, ServerInfo> rsInfos, short serverId, DN baseDn, |
| | | byte groupId) |
| | | { |
| | | /* |
| | | * Preference is given to servers with the requested group id: |
| | | * If there are some servers with the requested group id in the provided |
| | | * server list, then we run the search algorithm only on them. If no server |
| | | * with the requested group id, consider all of them. |
| | | */ |
| | | |
| | | // Filter for servers with same group id |
| | | HashMap<String, ServerInfo> sameGroupIdRsInfos = |
| | | new HashMap<String, ServerInfo>(); |
| | | |
| | | for (String repServer : rsInfos.keySet()) |
| | | { |
| | | ServerInfo serverInfo = rsInfos.get(repServer); |
| | | if (serverInfo.getGroupId() == groupId) |
| | | sameGroupIdRsInfos.put(repServer, serverInfo); |
| | | } |
| | | |
| | | // Some servers with same group id ? |
| | | if (sameGroupIdRsInfos.size() > 0) |
| | | { |
| | | return searchForBestReplicationServer(myState, sameGroupIdRsInfos, |
| | | serverId, baseDn); |
| | | } else |
| | | { |
| | | return searchForBestReplicationServer(myState, rsInfos, |
| | | serverId, baseDn); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the replication server that best fits our need so that we can |
| | | * connect to it. |
| | | * |
| | | * Note: this method put as public static for unit testing purpose. |
| | | * |
| | | * @param myState The local server state. |
| | | * @param rsStates The list of available replication servers and their |
| | | * associated server state. |
| | | * @param rsInfos The list of available replication servers and their |
| | | * associated information (choice will be made among them). |
| | | * @param serverId The server id for the suffix we are working for. |
| | | * @param baseDn The suffix for which we are working for. |
| | | * @return The computed best replication server. |
| | | */ |
| | | public static String computeBestReplicationServer(ServerState myState, |
| | | HashMap<String, ServerState> rsStates, short serverId, DN baseDn) |
| | | private static String searchForBestReplicationServer(ServerState myState, |
| | | HashMap<String, ServerInfo> rsInfos, short serverId, DN baseDn) |
| | | { |
| | | |
| | | /* |
| | | * Find replication servers who are up to date (or more up to date than us, |
| | | * if for instance we failed and restarted, having sent some changes to the |
| | |
| | | */ |
| | | |
| | | // Should never happen (sanity check) |
| | | if ((myState == null) || (rsStates == null) || (rsStates.size() < 1) || |
| | | if ((myState == null) || (rsInfos == null) || (rsInfos.size() < 1) || |
| | | (baseDn == null)) |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | // Shortcut, if only one server, this is the best |
| | | if (rsInfos.size() == 1) |
| | | { |
| | | for (String repServer : rsInfos.keySet()) |
| | | return repServer; |
| | | } |
| | | |
| | | String bestServer = null; |
| | | // Servers up to dates with regard to our changes |
| | | HashMap<String, ServerState> upToDateServers = |
| | |
| | | { |
| | | myChangeNumber = new ChangeNumber(0, 0, serverId); |
| | | } |
| | | for (String repServer : rsStates.keySet()) |
| | | for (String repServer : rsInfos.keySet()) |
| | | { |
| | | |
| | | ServerState rsState = rsStates.get(repServer); |
| | | ServerState rsState = rsInfos.get(repServer).getServerState(); |
| | | ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId); |
| | | if (rsChangeNumber == null) |
| | | { |
| | |
| | | |
| | | Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get( |
| | | upToDateServers.size(), |
| | | baseDn.toNormalizedString()); |
| | | baseDn.toNormalizedString(), |
| | | Short.toString(serverId)); |
| | | logError(message); |
| | | |
| | | /* |
| | |
| | | if ((minShift < 0) // First time in loop |
| | | || (shift < minShift)) |
| | | { |
| | | // This sever is even closer to topo state |
| | | // This server is even closer to topo state |
| | | bestServer = upServer; |
| | | minShift = shift; |
| | | } |
| | |
| | | minShift = tmpShift; |
| | | } |
| | | } // For late servers |
| | | |
| | | } |
| | | return bestServer; |
| | | } |
| | |
| | | LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); |
| | | attrs.add(Historical.HISTORICALATTRIBUTENAME); |
| | | attrs.add(Historical.ENTRYUIDNAME); |
| | | attrs.add("*"); |
| | | return conn.processSearch( |
| | | new ASN1OctetString(baseDn.toString()), |
| | | SearchScope.WHOLE_SUBTREE, |
| | |
| | | if (heartbeatInterval > 0) |
| | | { |
| | | heartbeatMonitor = |
| | | new HeartbeatMonitor("Replication Heartbeat Monitor on " + |
| | | baseDn + " with " + getReplicationServer(), |
| | | new HeartbeatMonitor("Replication Heartbeat Monitor on RS " + |
| | | getReplicationServer() + " " + rsServerId + " for " + baseDn + |
| | | " in DS " + serverId, |
| | | session, heartbeatInterval); |
| | | heartbeatMonitor.start(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Starts the same group id poller. |
| | | */ |
| | | private void startSameGroupIdPoller() |
| | | { |
| | | sameGroupIdPoller = new SameGroupIdPoller(); |
| | | sameGroupIdPoller.start(); |
| | | } |
| | | |
| | | /** |
| | | * Stops the same group id poller. |
| | | */ |
| | | private void stopSameGroupIdPoller() |
| | | { |
| | | if (sameGroupIdPoller != null) |
| | | { |
| | | sameGroupIdPoller.shutdown(); |
| | | sameGroupIdPoller.waitForShutdown(); |
| | | sameGroupIdPoller = null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Stop the heartbeat monitor thread. |
| | | */ |
| | | void stopHeartBeat() |
| | |
| | | } |
| | | } catch (IOException e1) |
| | | { |
| | | // ignore |
| | | // ignore |
| | | } |
| | | |
| | | if (failingSession == session) |
| | | { |
| | | this.connected = false; |
| | | rsGroupId = (byte) -1; |
| | | rsServerId = -1; |
| | | rsServerUrl = null; |
| | | } |
| | | while (!this.connected && (!this.shutdown)) |
| | | { |
| | |
| | | Thread.sleep(500); |
| | | } catch (InterruptedException e) |
| | | { |
| | | // ignore |
| | | // ignore |
| | | } |
| | | } |
| | | } |
| | |
| | | * Publish a message to the other servers. |
| | | * @param msg the message to publish |
| | | */ |
| | | public void publish(ReplicationMessage msg) |
| | | public void publish(ReplicationMsg msg) |
| | | { |
| | | boolean done = false; |
| | | |
| | |
| | | { |
| | | // It was not possible to connect to any replication server. |
| | | // Since the operation was already processed, we have no other |
| | | // choice than to return without sending the ReplicationMessage |
| | | // choice than to return without sending the ReplicationMsg |
| | | // and relying on the resend procedure of the connect phase to |
| | | // fix the problem when we finally connect. |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("ReplicationBroker.publish() Publishing a " + |
| | | " message is not possible due to existing connection error."); |
| | | "message is not possible due to existing connection error."); |
| | | } |
| | | |
| | | return; |
| | |
| | | currentWindowSemaphore = sendWindow; |
| | | } |
| | | |
| | | if (msg instanceof UpdateMessage) |
| | | if (msg instanceof UpdateMsg) |
| | | { |
| | | // Acquiring the window credit must be done outside of the |
| | | // connectPhaseLock because it can be blocking and we don't |
| | |
| | | if (!credit) |
| | | { |
| | | // the window is still closed. |
| | | // Send a WindowProbe message to wakeup the receiver in case the |
| | | // Send a WindowProbeMsg message to wakeup the receiver in case the |
| | | // window update message was lost somehow... |
| | | // then loop to check again if connection was closed. |
| | | session.publish(new WindowProbe()); |
| | | session.publish(new WindowProbeMsg()); |
| | | } |
| | | } catch (IOException e) |
| | | { |
| | |
| | | * @throws SocketTimeoutException if the timeout set by setSoTimeout |
| | | * has expired |
| | | */ |
| | | public ReplicationMessage receive() throws SocketTimeoutException |
| | | public ReplicationMsg receive() throws SocketTimeoutException |
| | | { |
| | | while (shutdown == false) |
| | | { |
| | |
| | | ProtocolSession failingSession = session; |
| | | try |
| | | { |
| | | ReplicationMessage msg = session.receive(); |
| | | if (msg instanceof WindowMessage) |
| | | ReplicationMsg msg = session.receive(); |
| | | if (msg instanceof WindowMsg) |
| | | { |
| | | WindowMessage windowMsg = (WindowMessage) msg; |
| | | WindowMsg windowMsg = (WindowMsg) msg; |
| | | sendWindow.release(windowMsg.getNumAck()); |
| | | } |
| | | else |
| | | } else |
| | | { |
| | | return msg; |
| | | } |
| | |
| | | if (shutdown == false) |
| | | { |
| | | Message message = |
| | | NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer); |
| | | NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer, |
| | | Short.toString(rsServerId), baseDn.toString(), |
| | | Short.toString(serverId)); |
| | | logError(message); |
| | | |
| | | debugInfo("ReplicationBroker.receive() " + baseDn + |
| | | " Exception raised." + e + e.getLocalizedMessage()); |
| | | " Exception raised: " + e.getLocalizedMessage()); |
| | | this.reStart(failingSession); |
| | | } |
| | | } |
| | |
| | | rcvWindow--; |
| | | if ((rcvWindow < halfRcvWindow) && (session != null)) |
| | | { |
| | | session.publish(new WindowMessage(halfRcvWindow)); |
| | | session.publish(new WindowMsg(halfRcvWindow)); |
| | | rcvWindow += halfRcvWindow; |
| | | } |
| | | } catch (IOException e) |
| | |
| | | */ |
| | | public void stop() |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("ReplicationBroker " + serverId + " is stopping and will" + |
| | | " close the connection to replication server " + rsServerId + " for" |
| | | + " domain " + baseDn); |
| | | } |
| | | stopSameGroupIdPoller(); |
| | | stopHeartBeat(); |
| | | replicationServer = "stopped"; |
| | | shutdown = true; |
| | | connected = false; |
| | | rsGroupId = (byte) -1; |
| | | rsServerId = -1; |
| | | rsServerUrl = null; |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("ReplicationBroker is stopping. and will" + |
| | | " close the connection"); |
| | | } |
| | | |
| | | if (session != null) |
| | | { |
| | | session.close(); |
| | |
| | | * With this option set to a non-zero value, calls to the receive() method |
| | | * block for only this amount of time after which a |
| | | * java.net.SocketTimeoutException is raised. |
| | | * The Broker is valid and useable even after such an Exception is raised. |
| | | * The Broker is valid and usable even after such an Exception is raised. |
| | | * |
| | | * @param timeout the specified timeout, in milliseconds. |
| | | * @throws SocketException if there is an error in the underlying protocol, |
| | |
| | | SearchResultEntry searchEntry) |
| | | { |
| | | /* |
| | | * Only deal with modify operation so far |
| | | * TODO : implement code for ADD, DEL, MODDN operation |
| | | * |
| | | * Parse all ds-sync-hist attribute values |
| | | * - for each Changenumber > replication server MaxChangeNumber : |
| | | * build an attribute mod |
| | | * |
| | | * This call back is called at session establishment phase |
| | | * for each entry that has been changed by this server and the changes |
| | | * have not been sent to any Replication Server. |
| | | * The role of this method is to build equivalent operation from |
| | | * the historical information and add them in the replayOperations |
| | | * table. |
| | | */ |
| | | Iterable<FakeOperation> updates = |
| | | Historical.generateFakeOperations(searchEntry); |
| | |
| | | InternalSearchOperation searchOperation, |
| | | SearchResultReference searchReference) |
| | | { |
| | | // TODO to be implemented |
| | | // TODO to be implemented |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public boolean isConnected() |
| | | { |
| | | return !connectionError; |
| | | return connected; |
| | | } |
| | | |
| | | private boolean debugEnabled() |
| | |
| | | } |
| | | return isEncrypted; |
| | | } |
| | | |
| | | /** |
| | | * In case we are connected to a RS with a different group id, we use this |
| | | * thread to poll presence of a RS with the same group id as ours. If a RS |
| | | * with the same group id is available, we close the session to force |
| | | * reconnection. Reconnection will choose a server with the same group id. |
| | | */ |
| | | private class SameGroupIdPoller extends DirectoryThread |
| | | { |
| | | |
| | | private boolean sameGroupIdPollershutdown = false; |
| | | private boolean terminated = false; |
| | | // Sleep interval in ms |
| | | private static final int SAME_GROUP_ID_POLLER_PERIOD = 5000; |
| | | |
| | | public SameGroupIdPoller() |
| | | { |
| | | super("Replication Broker Same Group Id Poller for " + baseDn.toString() + |
| | | " and group id " + groupId + " in server id " + serverId); |
| | | } |
| | | |
| | | /** |
| | | * Wait for the completion of the same group id poller. |
| | | */ |
| | | public void waitForShutdown() |
| | | { |
| | | try |
| | | { |
| | | while (terminated == false) |
| | | { |
| | | Thread.sleep(50); |
| | | } |
| | | } catch (InterruptedException e) |
| | | { |
| | | // exit the loop if this thread is interrupted. |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Shutdown the same group id poller. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | sameGroupIdPollershutdown = true; |
| | | } |
| | | |
| | | /** |
| | | * Permanently look for RS with our group id and if found, break current |
| | | * connection to force reconnection to a new server with the right group id. |
| | | */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | boolean done = false; |
| | | |
| | | while ((!done) && (!sameGroupIdPollershutdown)) |
| | | { |
| | | // Sleep some time between checks |
| | | try |
| | | { |
| | | Thread.sleep(SAME_GROUP_ID_POLLER_PERIOD); |
| | | } catch (InterruptedException e) |
| | | { |
| | | // Stop as we are interrupted |
| | | sameGroupIdPollershutdown = true; |
| | | } |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Running SameGroupIdPoller for: " + |
| | | baseDn.toString()); |
| | | } |
| | | if (session != null) // Check only if not already disconnected |
| | | |
| | | { |
| | | for (String server : servers) |
| | | { |
| | | // Do not ask the RS we are connected to as it has for sure the |
| | | // wrong group id |
| | | if (server.equals(rsServerUrl)) |
| | | continue; |
| | | |
| | | // Connect to server and get reply message |
| | | ReplServerStartMsg replServerStartMsg = |
| | | performPhaseOneHandshake(server, false); |
| | | |
| | | // Store reply message info in list |
| | | if (replServerStartMsg != null) |
| | | { |
| | | if (groupId == replServerStartMsg.getGroupId()) |
| | | { |
| | | // Found one server with the same group id as us, disconnect |
| | | // session to force reconnection to a server with same group |
| | | // id. |
| | | Message message = NOTE_NEW_SERVER_WITH_SAME_GROUP_ID.get( |
| | | Byte.toString(groupId), baseDn.toString(), |
| | | Short.toString(serverId)); |
| | | logError(message); |
| | | try |
| | | { |
| | | session.close(); |
| | | } catch (Exception e) |
| | | { |
| | | // The session was already closed, just ignore. |
| | | } |
| | | session = null; |
| | | done = true; // Terminates thread as did its job. |
| | | |
| | | break; |
| | | } |
| | | } |
| | | } // for server |
| | | |
| | | } |
| | | } |
| | | } |
| | | |
| | | terminated = true; |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("SameGroupIdPoller for: " + baseDn.toString() + |
| | | " terminated."); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Signals the RS we just entered a new status. |
| | | * @param newStatus The status the local DS just entered |
| | | */ |
| | | public void signalStatusChange(ServerStatus newStatus) |
| | | { |
| | | try |
| | | { |
| | | |
| | | ChangeStatusMsg csMsg = new ChangeStatusMsg(ServerStatus.INVALID_STATUS, |
| | | newStatus); |
| | | session.publish(csMsg); |
| | | } catch (IOException ex) |
| | | { |
| | | Message message = ERR_EXCEPTION_SENDING_CS.get( |
| | | baseDn.toNormalizedString(), |
| | | Short.toString(serverId), |
| | | ex.getLocalizedMessage() + stackTraceToSingleLineString(ex)); |
| | | logError(message); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Sets the group id of the broker. |
| | | * @param groupId The new group id. |
| | | */ |
| | | public void setGroupId(byte groupId) |
| | | { |
| | | this.groupId = groupId; |
| | | } |
| | | } |