| | |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.AckMessage; |
| | | import org.opends.server.replication.protocol.ChangelogStartMessage; |
| | | import org.opends.server.replication.protocol.ReplServerStartMessage; |
| | | import org.opends.server.replication.protocol.HeartbeatThread; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | | import org.opends.server.replication.protocol.RoutableMessage; |
| | |
| | | |
| | | /** |
| | | * This class defines a server handler, which handles all interaction with a |
| | | * changelog server. |
| | | * replication server. |
| | | */ |
| | | public class ServerHandler extends MonitorProvider |
| | | { |
| | |
| | | private MsgQueue lateQueue = new MsgQueue(); |
| | | private final Map<ChangeNumber, AckMessageList> waitingAcks = |
| | | new HashMap<ChangeNumber, AckMessageList>(); |
| | | private ChangelogCache changelogCache = null; |
| | | private ReplicationCache replicationCache = null; |
| | | private String serverURL; |
| | | private int outCount = 0; // number of update sent to the server |
| | | private int inCount = 0; // number of updates received from the server |
| | |
| | | // flow controled and should |
| | | // be stopped from sending messsages. |
| | | private int saturationCount = 0; |
| | | private short changelogId; |
| | | private short replicationServerId; |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats from the replication |
| | |
| | | */ |
| | | HeartbeatThread heartbeatThread = null; |
| | | |
| | | private static final Map<ChangeNumber, ChangelogAckMessageList> |
| | | changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>(); |
| | | private static final Map<ChangeNumber, ReplServerAckMessageList> |
| | | changelogsWaitingAcks = |
| | | new HashMap<ChangeNumber, ReplServerAckMessageList>(); |
| | | |
| | | /** |
| | | * Creates a new server handler instance with the provided socket. |
| | |
| | | |
| | | /** |
| | | * Do the exchange of start messages to know if the remote |
| | | * server is an LDAP or changelog server and to exchange serverID. |
| | | * server is an LDAP or replication server and to exchange serverID. |
| | | * Then create the reader and writer thread. |
| | | * |
| | | * @param baseDn baseDn of the ServerHandler when this is an outgoing conn. |
| | | * null if this is an incoming connection. |
| | | * @param changelogId The identifier of the changelog that creates this |
| | | * server handler. |
| | | * @param changelogURL The URL of the changelog that creates this |
| | | * server handler. |
| | | * @param replicationServerId The identifier of the replicationServer that |
| | | * creates this server handler. |
| | | * @param replicationServerURL The URL of the replicationServer that creates |
| | | * this server handler. |
| | | * @param windowSize the window size that this server handler must use. |
| | | * @param changelog the Changelog that created this server handler. |
| | | * @param replicationServer the ReplicationServer that created this server |
| | | * handler. |
| | | */ |
| | | public void start(DN baseDn, short changelogId, String changelogURL, |
| | | int windowSize, Changelog changelog) |
| | | public void start(DN baseDn, short replicationServerId, |
| | | String replicationServerURL, |
| | | int windowSize, ReplicationServer replicationServer) |
| | | { |
| | | this.changelogId = changelogId; |
| | | this.replicationServerId = replicationServerId; |
| | | rcvWindowSizeHalf = windowSize/2; |
| | | maxRcvWindow = windowSize; |
| | | rcvWindow = windowSize; |
| | |
| | | if (baseDn != null) |
| | | { |
| | | this.baseDn = baseDn; |
| | | changelogCache = changelog.getChangelogCache(baseDn); |
| | | ServerState localServerState = changelogCache.getDbServerState(); |
| | | ChangelogStartMessage msg = |
| | | new ChangelogStartMessage(changelogId, changelogURL, |
| | | replicationCache = replicationServer.getReplicationCache(baseDn); |
| | | ServerState localServerState = replicationCache.getDbServerState(); |
| | | ReplServerStartMessage msg = |
| | | new ReplServerStartMessage(replicationServerId, replicationServerURL, |
| | | baseDn, windowSize, localServerState); |
| | | |
| | | session.publish(msg); |
| | |
| | | |
| | | serverIsLDAPserver = true; |
| | | |
| | | changelogCache = changelog.getChangelogCache(this.baseDn); |
| | | ServerState localServerState = changelogCache.getDbServerState(); |
| | | ChangelogStartMessage myStartMsg = |
| | | new ChangelogStartMessage(changelogId, changelogURL, |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn); |
| | | ServerState localServerState = replicationCache.getDbServerState(); |
| | | ReplServerStartMessage myStartMsg = |
| | | new ReplServerStartMessage(replicationServerId, replicationServerURL, |
| | | this.baseDn, windowSize, localServerState); |
| | | session.publish(myStartMsg); |
| | | sendWindowSize = receivedMsg.getWindowSize(); |
| | | } |
| | | else if (msg instanceof ChangelogStartMessage) |
| | | else if (msg instanceof ReplServerStartMessage) |
| | | { |
| | | ChangelogStartMessage receivedMsg = (ChangelogStartMessage) msg; |
| | | ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg; |
| | | serverId = receivedMsg.getServerId(); |
| | | serverURL = receivedMsg.getServerURL(); |
| | | String[] splittedURL = serverURL.split(":"); |
| | |
| | | this.baseDn = receivedMsg.getBaseDn(); |
| | | if (baseDn == null) |
| | | { |
| | | changelogCache = changelog.getChangelogCache(this.baseDn); |
| | | ServerState serverState = changelogCache.getDbServerState(); |
| | | ChangelogStartMessage outMsg = |
| | | new ChangelogStartMessage(changelogId, changelogURL, |
| | | this.baseDn, windowSize, serverState); |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn); |
| | | ServerState serverState = replicationCache.getDbServerState(); |
| | | ReplServerStartMessage outMsg = |
| | | new ReplServerStartMessage(replicationServerId, |
| | | replicationServerURL, |
| | | this.baseDn, windowSize, serverState); |
| | | session.publish(outMsg); |
| | | } |
| | | else |
| | |
| | | return; // we did not recognize the message, ignore it |
| | | } |
| | | |
| | | changelogCache = changelog.getChangelogCache(this.baseDn); |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn); |
| | | |
| | | if (serverIsLDAPserver) |
| | | { |
| | | changelogCache.startServer(this); |
| | | replicationCache.startServer(this); |
| | | } |
| | | else |
| | | { |
| | | changelogCache.startChangelog(this); |
| | | replicationCache.startReplicationServer(this); |
| | | } |
| | | |
| | | writer = new ServerWriter(session, serverId, this, changelogCache); |
| | | writer = new ServerWriter(session, serverId, this, replicationCache); |
| | | |
| | | reader = new ServerReader(session, serverId, this, |
| | | changelogCache); |
| | | replicationCache); |
| | | |
| | | reader.start(); |
| | | writer.start(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check if the server associated to this ServerHandler is a changelog server. |
| | | * Check if the server associated to this ServerHandler is a replication |
| | | * server. |
| | | * @return true if the server associated to this ServerHandler is a |
| | | * changelog server. |
| | | * replication server. |
| | | */ |
| | | public boolean isChangelogServer() |
| | | public boolean isReplicationServer() |
| | | { |
| | | return (!serverIsLDAPserver); |
| | | } |
| | |
| | | * the sum of the number of missing changes for every dbHandler. |
| | | */ |
| | | int totalCount = 0; |
| | | ServerState dbState = changelogCache.getDbServerState(); |
| | | ServerState dbState = replicationCache.getDbServerState(); |
| | | for (short id : dbState) |
| | | { |
| | | int max = dbState.getMaxChangeNumber(id).getSeqnum(); |
| | |
| | | * Get an approximation of the delay by looking at the age of the odest |
| | | * message that has not been sent to this server. |
| | | * This is an approximation because the age is calculated using the |
| | | * clock of the servee where the changelog is currently running |
| | | * clock of the servee where the replicationServer is currently running |
| | | * while it should be calculated using the clock of the server |
| | | * that originally processed the change. |
| | | * |
| | |
| | | saturationCount = 0; |
| | | try |
| | | { |
| | | changelogCache.checkAllSaturation(); |
| | | replicationCache.checkAllSaturation(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | |
| | | * load this change on the delayList |
| | | * |
| | | */ |
| | | ChangelogIteratorComparator comparator = |
| | | new ChangelogIteratorComparator(); |
| | | SortedSet<ChangelogIterator> iteratorSortedSet = |
| | | new TreeSet<ChangelogIterator>(comparator); |
| | | ReplicationIteratorComparator comparator = |
| | | new ReplicationIteratorComparator(); |
| | | SortedSet<ReplicationIterator> iteratorSortedSet = |
| | | new TreeSet<ReplicationIterator>(comparator); |
| | | /* fill the lateQueue */ |
| | | for (short serverId : changelogCache.getServers()) |
| | | for (short serverId : replicationCache.getServers()) |
| | | { |
| | | ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId); |
| | | ChangelogIterator iterator = |
| | | changelogCache.getChangelogIterator(serverId, lastCsn); |
| | | ReplicationIterator iterator = |
| | | replicationCache.getChangelogIterator(serverId, lastCsn); |
| | | if ((iterator != null) && (iterator.getChange() != null)) |
| | | { |
| | | iteratorSortedSet.add(iterator); |
| | |
| | | } |
| | | while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100)) |
| | | { |
| | | ChangelogIterator iterator = iteratorSortedSet.first(); |
| | | ReplicationIterator iterator = iteratorSortedSet.first(); |
| | | iteratorSortedSet.remove(iterator); |
| | | lateQueue.add(iterator.getChange()); |
| | | if (iterator.next()) |
| | |
| | | else |
| | | iterator.releaseCursor(); |
| | | } |
| | | for (ChangelogIterator iterator : iteratorSortedSet) |
| | | for (ReplicationIterator iterator : iteratorSortedSet) |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | |
| | | } |
| | | if (completedFlag) |
| | | { |
| | | changelogCache.sendAck(changeNumber, true); |
| | | replicationCache.sendAck(changeNumber, true); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Process reception of an for an update that was received from a |
| | | * Changelog Server. |
| | | * ReplicationServer. |
| | | * |
| | | * @param message the ack message that was received. |
| | | * @param ackingServerId The id of the server that acked the change. |
| | |
| | | public static void ackChangelog(AckMessage message, short ackingServerId) |
| | | { |
| | | ChangeNumber changeNumber = message.getChangeNumber(); |
| | | ChangelogAckMessageList ackList; |
| | | ReplServerAckMessageList ackList; |
| | | boolean completedFlag; |
| | | synchronized (changelogsWaitingAcks) |
| | | { |
| | |
| | | } |
| | | if (completedFlag) |
| | | { |
| | | ChangelogCache changelogCache = ackList.getChangelogCache(); |
| | | changelogCache.sendAck(changeNumber, false, |
| | | ackList.getChangelogServerId()); |
| | | ReplicationCache replicationCache = ackList.getChangelogCache(); |
| | | replicationCache.sendAck(changeNumber, false, |
| | | ackList.getReplicationServerId()); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Add an update to the list of update received from a changelog server and |
| | | * Add an update to the list of update received from a replicationServer and |
| | | * waiting for acks. |
| | | * |
| | | * @param update The update that must be added to the list. |
| | | * @param ChangelogServerId The identifier of the changelog that sent the |
| | | * update. |
| | | * @param changelogCache The ChangelogCache from which the change was |
| | | * processed and to which the ack must later be sent. |
| | | * @param ChangelogServerId The identifier of the replicationServer that sent |
| | | * the update. |
| | | * @param replicationCache The ReplicationCache from which the change was |
| | | * processed and to which the ack must later be sent. |
| | | * @param nbWaitedAck The number of ack that must be received before |
| | | * the update is fully acked. |
| | | */ |
| | | public static void addWaitingAck(UpdateMessage update, |
| | | short ChangelogServerId, ChangelogCache changelogCache, int nbWaitedAck) |
| | | public static void addWaitingAck( |
| | | UpdateMessage update, |
| | | short ChangelogServerId, ReplicationCache replicationCache, |
| | | int nbWaitedAck) |
| | | { |
| | | ChangelogAckMessageList ackList = |
| | | new ChangelogAckMessageList(update.getChangeNumber(), |
| | | ReplServerAckMessageList ackList = |
| | | new ReplServerAckMessageList(update.getChangeNumber(), |
| | | nbWaitedAck, |
| | | ChangelogServerId, changelogCache); |
| | | ChangelogServerId, replicationCache); |
| | | synchronized(changelogsWaitingAcks) |
| | | { |
| | | changelogsWaitingAcks.put(update.getChangeNumber(), ackList); |
| | |
| | | * Check type of server handled. |
| | | * |
| | | * @return true if the handled server is an LDAP server. |
| | | * false if the handled server is a changelog server |
| | | * false if the handled server is a replicationServer |
| | | */ |
| | | public boolean isLDAPserver() |
| | | { |
| | |
| | | if (serverIsLDAPserver) |
| | | return "LDAP Server " + str; |
| | | else |
| | | return "Changelog Server " + str; |
| | | return "Replication Server " + str; |
| | | } |
| | | |
| | | /** |
| | |
| | | if (serverIsLDAPserver) |
| | | attributes.add(new Attribute("LDAP-Server", serverURL)); |
| | | else |
| | | attributes.add(new Attribute("Changelog-Server", serverURL)); |
| | | attributes.add(new Attribute("ReplicationServer-Server", serverURL)); |
| | | attributes.add(new Attribute("server-id", |
| | | String.valueOf(serverId))); |
| | | attributes.add(new Attribute("base-dn", |
| | |
| | | if (serverIsLDAPserver) |
| | | localString = "Directory Server "; |
| | | else |
| | | localString = "Changelog Server "; |
| | | localString = "Replication Server "; |
| | | |
| | | |
| | | localString += serverId + " " + serverURL + " " + baseDn; |
| | |
| | | { |
| | | if (flowControl) |
| | | { |
| | | if (changelogCache.restartAfterSaturation(this)) |
| | | if (replicationCache.restartAfterSaturation(this)) |
| | | { |
| | | flowControl = false; |
| | | } |
| | |
| | | public void process(RoutableMessage msg) |
| | | { |
| | | if (debugEnabled()) |
| | | debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId); |
| | | debugInfo("SH(" + replicationServerId + ") forwards " + |
| | | msg + " to " + serverId); |
| | | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | "SH(" + changelogId + ") receives " + msg + " from " + serverId, 1); |
| | | "SH(" + replicationServerId + ") receives " + msg + |
| | | " from " + serverId, 1); |
| | | |
| | | changelogCache.process(msg, this); |
| | | replicationCache.process(msg, this); |
| | | } |
| | | |
| | | /** |
| | |
| | | public void send(RoutableMessage msg) throws IOException |
| | | { |
| | | if (debugEnabled()) |
| | | debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId); |
| | | debugInfo("SH(" + replicationServerId + ") forwards " + |
| | | msg + " to " + serverId); |
| | | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | "SH(" + changelogId + ") forwards " + msg + " to " + serverId, 1); |
| | | "SH(" + replicationServerId + ") forwards " + |
| | | msg + " to " + serverId, 1); |
| | | |
| | | session.publish(msg); |
| | | } |