| | |
| | | import java.util.Date; |
| | | import java.util.List; |
| | | import java.util.ArrayList; |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | import java.util.Random; |
| | | import java.util.Set; |
| | |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeBuilder; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.InitializationException; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | |
| | | private ProtocolSession session; |
| | | private final MsgQueue msgQueue = new MsgQueue(); |
| | | private MsgQueue lateQueue = new MsgQueue(); |
| | | private final Map<ChangeNumber, AckMessageList> waitingAcks = |
| | | new HashMap<ChangeNumber, AckMessageList>(); |
| | | private ReplicationServerDomain replicationServerDomain = null; |
| | | private String serverURL; |
| | | private int outCount = 0; // number of update sent to the server |
| | | |
| | | private int inCount = 0; // number of updates received from the server |
| | | |
| | | private int inAckCount = 0; |
| | | private int outAckCount = 0; |
| | | private int maxReceiveQueue = 0; |
| | | private int maxSendQueue = 0; |
| | | private int maxReceiveDelay = 0; |
| | |
| | | private ServerState serverState; |
| | | private boolean activeWriter = true; |
| | | private ServerWriter writer = null; |
| | | private DN baseDn = null; |
| | | private String baseDn = null; |
| | | private int rcvWindow; |
| | | private int rcvWindowSizeHalf; |
| | | private int maxRcvWindow; |
| | |
| | | * Properties filled only if remote server is a DS |
| | | */ |
| | | |
| | | // Status of this DS |
| | | // Status of this DS (only used if this server handler represents a DS) |
| | | private ServerStatus status = ServerStatus.INVALID_STATUS; |
| | | // Referrals URLs this DS is exporting |
| | | private List<String> refUrls = new ArrayList<String>(); |
| | |
| | | * Set when ServerHandler is stopping. |
| | | */ |
| | | private AtomicBoolean shuttingDown = new AtomicBoolean(false); |
| | | private static final Map<ChangeNumber, ReplServerAckMessageList> |
| | | changelogsWaitingAcks = |
| | | new HashMap<ChangeNumber, ReplServerAckMessageList>(); |
| | | |
| | | /** |
| | | * Creates a new server handler instance with the provided socket. |
| | |
| | | * @param replicationServer the ReplicationServer that created this server |
| | | * handler. |
| | | */ |
| | | public void start(DN baseDn, short replicationServerId, |
| | | public void start(String baseDn, short replicationServerId, |
| | | String replicationServerURL, |
| | | int windowSize, boolean sslEncryption, |
| | | ReplicationServer replicationServer) |
| | |
| | | { |
| | | // Timeout |
| | | Message message = NOTE_TIMEOUT_WHEN_CROSS_CONNECTION.get( |
| | | this.baseDn.toNormalizedString(), |
| | | this.baseDn, |
| | | Short.toString(serverId), |
| | | Short.toString(replicationServer.getServerId())); |
| | | closeSession(message); |
| | |
| | | // gen ID and so won't change without a reset |
| | | // then we are just degrading the peer. |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get( |
| | | this.baseDn.toNormalizedString(), |
| | | this.baseDn, |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | |
| | | // replicationServerDomain. |
| | | // setGenerationId(generationId, false); |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get( |
| | | this.baseDn.toNormalizedString(), |
| | | this.baseDn, |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | |
| | | if (generationId != localGenerationId) |
| | | { |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get( |
| | | this.baseDn.toNormalizedString(), |
| | | this.baseDn, |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | |
| | | // If the LDAP server has already sent changes |
| | | // it is not expected to connect to an empty RS |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get( |
| | | this.baseDn.toNormalizedString(), |
| | | this.baseDn, |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | |
| | | // gen ID and so won't change without a reset |
| | | // then we are just degrading the peer. |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get( |
| | | this.baseDn.toNormalizedString(), |
| | | this.baseDn, |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | |
| | | // replicationServerDomain. |
| | | // setGenerationId(generationId, false); |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get( |
| | | this.baseDn.toNormalizedString(), |
| | | this.baseDn, |
| | | Short.toString(serverId), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the number of Ack received from the server managed by this handler. |
| | | * |
| | | * @return Returns the inAckCount. |
| | | */ |
| | | public int getInAckCount() |
| | | { |
| | | return inAckCount; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of Ack sent to the server managed by this handler. |
| | | * |
| | | * @return Returns the outAckCount. |
| | | */ |
| | | public int getOutAckCount() |
| | | { |
| | | return outAckCount; |
| | | } |
| | | |
| | | /** |
| | | * Check is this server is saturated (this server has already been |
| | | * sent a bunch of updates and has not processed them so they are staying |
| | | * in the message queue for this server an the size of the queue |
| | |
| | | } |
| | | |
| | | /** |
| | | * Send the ack to the server that did the original modification. |
| | | * Sends an ack message to the server represented by this object. |
| | | * |
| | | * @param changeNumber The ChangeNumber of the update that is acked. |
| | | * @param ack The ack message to be sent. |
| | | * @throws IOException In case of Exception thrown sending the ack. |
| | | */ |
| | | public void sendAck(ChangeNumber changeNumber) throws IOException |
| | | public void sendAck(AckMsg ack) throws IOException |
| | | { |
| | | AckMsg ack = new AckMsg(changeNumber); |
| | | session.publish(ack); |
| | | outAckCount++; |
| | | } |
| | | |
| | | /** |
| | | * Do the work when an ack message has been received from another server. |
| | | * |
| | | * @param message The ack message that was received. |
| | | * @param ackingServerId The id of the server that acked the change. |
| | | */ |
| | | public void ack(AckMsg message, short ackingServerId) |
| | | { |
| | | ChangeNumber changeNumber = message.getChangeNumber(); |
| | | AckMessageList ackList; |
| | | boolean completedFlag; |
| | | synchronized (waitingAcks) |
| | | { |
| | | ackList = waitingAcks.get(changeNumber); |
| | | if (ackList == null) |
| | | return; |
| | | ackList.addAck(ackingServerId); |
| | | completedFlag = ackList.completed(); |
| | | if (completedFlag) |
| | | { |
| | | waitingAcks.remove(changeNumber); |
| | | } |
| | | } |
| | | if (completedFlag) |
| | | { |
| | | replicationServerDomain.sendAck(changeNumber, true); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Process reception of an for an update that was received from a |
| | | * ReplicationServer. |
| | | * |
| | | * @param message the ack message that was received. |
| | | * @param ackingServerId The id of the server that acked the change. |
| | | */ |
| | | public static void ackChangelog(AckMsg message, short ackingServerId) |
| | | { |
| | | ChangeNumber changeNumber = message.getChangeNumber(); |
| | | ReplServerAckMessageList ackList; |
| | | boolean completedFlag; |
| | | synchronized (changelogsWaitingAcks) |
| | | { |
| | | ackList = changelogsWaitingAcks.get(changeNumber); |
| | | if (ackList == null) |
| | | return; |
| | | ackList.addAck(ackingServerId); |
| | | completedFlag = ackList.completed(); |
| | | if (completedFlag) |
| | | { |
| | | changelogsWaitingAcks.remove(changeNumber); |
| | | } |
| | | } |
| | | if (completedFlag) |
| | | { |
| | | ReplicationServerDomain replicationServerDomain = |
| | | ackList.getChangelogCache(); |
| | | replicationServerDomain.sendAck(changeNumber, false, |
| | | ackList.getReplicationServerId()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Add an update to the list of update waiting for acks. |
| | | * |
| | | * @param update the update that must be added to the list |
| | | * @param nbWaitedAck The number of ack that must be received before |
| | | * the update is fully acked. |
| | | */ |
| | | public void addWaitingAck(UpdateMsg update, int nbWaitedAck) |
| | | { |
| | | AckMessageList ackList = new AckMessageList(update.getChangeNumber(), |
| | | nbWaitedAck); |
| | | synchronized (waitingAcks) |
| | | { |
| | | waitingAcks.put(update.getChangeNumber(), ackList); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Add an update to the list of update received from a replicationServer and |
| | | * waiting for acks. |
| | | * |
| | | * @param update The update that must be added to the list. |
| | | * @param ChangelogServerId The identifier of the replicationServer that sent |
| | | * the update. |
| | | * @param replicationServerDomain The ReplicationServerDomain from which the |
| | | * change was processed and to which the ack |
| | | * must later be sent. |
| | | * @param nbWaitedAck The number of ack that must be received before |
| | | * the update is fully acked. |
| | | */ |
| | | public static void addWaitingAck( |
| | | UpdateMsg update, |
| | | short ChangelogServerId, ReplicationServerDomain replicationServerDomain, |
| | | int nbWaitedAck) |
| | | { |
| | | ReplServerAckMessageList ackList = |
| | | new ReplServerAckMessageList(update.getChangeNumber(), |
| | | nbWaitedAck, |
| | | ChangelogServerId, |
| | | replicationServerDomain); |
| | | synchronized (changelogsWaitingAcks) |
| | | { |
| | | changelogsWaitingAcks.put(update.getChangeNumber(), ackList); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the size of the list of update waiting for acks. |
| | | * |
| | | * @return the size of the list of update waiting for acks. |
| | | */ |
| | | public int getWaitingAckSize() |
| | | { |
| | | synchronized (waitingAcks) |
| | | { |
| | | return waitingAcks.size(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Increment the count of Acks received from this server. |
| | | */ |
| | | public void incrementInAckCount() |
| | | { |
| | | inAckCount++; |
| | | } |
| | | |
| | | /** |
| | |
| | | .valueOf(serverId))); |
| | | attributes.add(Attributes.create("base-dn", baseDn.toString())); |
| | | |
| | | if (serverIsLDAPserver) |
| | | try |
| | | { |
| | | MonitorData md; |
| | | try |
| | | { |
| | | md = replicationServerDomain.getMonitorData(); |
| | | md = replicationServerDomain.getMonitorData(); |
| | | |
| | | if (serverIsLDAPserver) |
| | | { |
| | | // Oldest missing update |
| | | Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId); |
| | | if ((approxFirstMissingDate != null) && (approxFirstMissingDate > 0)) |
| | |
| | | "approx-older-change-not-synchronized", date.toString())); |
| | | attributes.add(Attributes.create( |
| | | "approx-older-change-not-synchronized-millis", String |
| | | .valueOf(approxFirstMissingDate))); |
| | | .valueOf(approxFirstMissingDate))); |
| | | } |
| | | |
| | | // Missing changes |
| | |
| | | attributes.add(Attributes.create("approximate-delay", String |
| | | .valueOf(delay))); |
| | | } |
| | | catch (Exception e) |
| | | else |
| | | { |
| | | // TODO: improve the log |
| | | // We failed retrieving the remote monitor data. |
| | | attributes.add(Attributes.create("error", |
| | | stackTraceToSingleLineString(e))); |
| | | // Missing changes |
| | | long missingChanges = md.getMissingChangesRS(serverId); |
| | | attributes.add(Attributes.create("missing-changes", String |
| | | .valueOf(missingChanges))); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // TODO: improve the log |
| | | // We failed retrieving the remote monitor data. |
| | | attributes.add(Attributes.create("error", |
| | | stackTraceToSingleLineString(e))); |
| | | } |
| | | |
| | | attributes.add( |
| | | Attributes.create("queue-size", String.valueOf(msgQueue.count()))); |
| | |
| | | attributes.add(Attributes.create("update-received", String |
| | | .valueOf(getInCount()))); |
| | | |
| | | // Deprecated as long as assured is not exposed |
| | | attributes.add(Attributes.create("update-waiting-acks", String |
| | | .valueOf(getWaitingAckSize()))); |
| | | attributes.add(Attributes.create("ack-sent", String |
| | | .valueOf(getOutAckCount()))); |
| | | attributes.add(Attributes.create("ack-received", String |
| | | .valueOf(getInAckCount()))); |
| | | |
| | | // Window stats |
| | | attributes.add(Attributes.create("max-send-window", String |
| | | .valueOf(sendWindowSize))); |
| | |
| | | /* |
| | | * Stop the remote LSHandler |
| | | */ |
| | | for (LightweightServerHandler lsh : directoryServers.values()) |
| | | synchronized (directoryServers) |
| | | { |
| | | lsh.stopHandler(); |
| | | for (LightweightServerHandler lsh : directoryServers.values()) |
| | | { |
| | | lsh.stopHandler(); |
| | | } |
| | | directoryServers.clear(); |
| | | } |
| | | directoryServers.clear(); |
| | | |
| | | /* |
| | | * Stop the heartbeat thread. |
| | |
| | | { |
| | | WindowMsg msg = new WindowMsg(rcvWindowSizeHalf); |
| | | session.publish(msg); |
| | | outAckCount++; |
| | | rcvWindow += rcvWindowSizeHalf; |
| | | } |
| | | } |
| | |
| | | */ |
| | | List<DSInfo> dsInfos = topoMsg.getDsList(); |
| | | |
| | | // Removes the existing structures |
| | | for (LightweightServerHandler lsh : directoryServers.values()) |
| | | synchronized (directoryServers) |
| | | { |
| | | lsh.stopHandler(); |
| | | } |
| | | directoryServers.clear(); |
| | | // Removes the existing structures |
| | | for (LightweightServerHandler lsh : directoryServers.values()) |
| | | { |
| | | lsh.stopHandler(); |
| | | } |
| | | directoryServers.clear(); |
| | | |
| | | // Creates the new structure according to the message received. |
| | | for (DSInfo dsInfo : dsInfos) |
| | | { |
| | | LightweightServerHandler lsh = new LightweightServerHandler(this, |
| | | serverId, dsInfo.getDsId(), dsInfo.getGenerationId(), |
| | | dsInfo.getGroupId(), dsInfo.getStatus(), dsInfo.getRefUrls(), |
| | | dsInfo.isAssured(), dsInfo.getAssuredMode(), |
| | | dsInfo.getSafeDataLevel()); |
| | | lsh.startHandler(); |
| | | directoryServers.put(lsh.getServerId(), lsh); |
| | | // Creates the new structure according to the message received. |
| | | for (DSInfo dsInfo : dsInfos) |
| | | { |
| | | LightweightServerHandler lsh = new LightweightServerHandler(this, |
| | | serverId, dsInfo.getDsId(), dsInfo.getGenerationId(), |
| | | dsInfo.getGroupId(), dsInfo.getStatus(), dsInfo.getRefUrls(), |
| | | dsInfo.isAssured(), dsInfo.getAssuredMode(), |
| | | dsInfo.getSafeDataLevel()); |
| | | lsh.startHandler(); |
| | | directoryServers.put(lsh.getServerId(), lsh); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public boolean hasRemoteLDAPServers() |
| | | { |
| | | return !directoryServers.isEmpty(); |
| | | synchronized (directoryServers) |
| | | { |
| | | return !directoryServers.isEmpty(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | // TODO also log an error message. |
| | | WindowMsg msg = new WindowMsg(rcvWindow); |
| | | session.publish(msg); |
| | | outAckCount++; |
| | | } else |
| | | { |
| | | // Both the LDAP server and the replication server believes that the |
| | |
| | | */ |
| | | public Set<Short> getConnectedDirectoryServerIds() |
| | | { |
| | | return directoryServers.keySet(); |
| | | } |
| | | |
| | | /** |
| | | * Get the map of connected DSs |
| | | * (to the RS represented by this server handler). |
| | | * @return The map of connected DSs |
| | | */ |
| | | public Map<Short, LightweightServerHandler> getConnectedDSs() |
| | | { |
| | | return directoryServers; |
| | | synchronized (directoryServers) |
| | | { |
| | | return directoryServers.keySet(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | return protocolVersion; |
| | | } |
| | | |
| | | /** |
| | | * Add the DSinfos of the connected Directory Servers |
| | | * to the List of DSInfo provided as a parameter. |
| | | * |
| | | * @param dsInfos The List of DSInfo that should be updated |
| | | * with the DSInfo for the directoryServers |
| | | * connected to this ServerHandler. |
| | | */ |
| | | public void addDSInfos(List<DSInfo> dsInfos) |
| | | { |
| | | synchronized (directoryServers) |
| | | { |
| | | for (LightweightServerHandler ls : directoryServers.values()) |
| | | { |
| | | dsInfos.add(ls.toDSInfo()); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Gets the group id of the server represented by this object. |
| | | * @return The group id of the server represented by this object. |
| | | */ |
| | | public byte getGroupId() |
| | | { |
| | | return groupId; |
| | | } |
| | | } |