| | |
| | | private MsgQueue lateQueue = new MsgQueue(); |
| | | private final Map<ChangeNumber, AckMessageList> waitingAcks = |
| | | new HashMap<ChangeNumber, AckMessageList>(); |
| | | private ReplicationCache replicationCache = null; |
| | | private ReplicationServerDomain replicationServerDomain = null; |
| | | private String serverURL; |
| | | private int outCount = 0; // number of update sent to the server |
| | | private int inCount = 0; // number of updates received from the server |
| | |
| | | // This is an outgoing connection. Publish our start message. |
| | | this.baseDn = baseDn; |
| | | |
| | | // Get or create the ReplicationCache |
| | | replicationCache = replicationServer.getReplicationCache(baseDn, true); |
| | | localGenerationId = replicationCache.getGenerationId(); |
| | | // Get or create the ReplicationServerDomain |
| | | replicationServerDomain = |
| | | replicationServer.getReplicationServerDomain(baseDn, true); |
| | | localGenerationId = replicationServerDomain.getGenerationId(); |
| | | |
| | | ServerState localServerState = replicationCache.getDbServerState(); |
| | | ServerState localServerState = |
| | | replicationServerDomain.getDbServerState(); |
| | | ReplServerStartMessage msg = |
| | | new ReplServerStartMessage(replicationServerId, replicationServerURL, |
| | | baseDn, windowSize, localServerState, |
| | |
| | | |
| | | serverIsLDAPserver = true; |
| | | |
| | | // Get or Create the ReplicationCache |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn, |
| | | true); |
| | | localGenerationId = replicationCache.getGenerationId(); |
| | | // Get or Create the ReplicationServerDomain |
| | | replicationServerDomain = |
| | | replicationServer.getReplicationServerDomain(this.baseDn, true); |
| | | localGenerationId = replicationServerDomain.getGenerationId(); |
| | | |
| | | ServerState localServerState = replicationCache.getDbServerState(); |
| | | ServerState localServerState = |
| | | replicationServerDomain.getDbServerState(); |
| | | // This an incoming connection. Publish our start message |
| | | ReplServerStartMessage myStartMsg = |
| | | new ReplServerStartMessage(replicationServerId, replicationServerURL, |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | Set<String> ss = this.serverState.toStringSet(); |
| | | Set<String> lss = replicationCache.getDbServerState().toStringSet(); |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | Set<String> lss = |
| | | replicationServerDomain.getDbServerState().toStringSet(); |
| | | TRACER.debugInfo("In " + replicationServerDomain. |
| | | getReplicationServer().getMonitorInstanceName() + |
| | | ", SH received START from LS serverId=" + serverId + |
| | | " baseDN=" + this.baseDn + |
| | | " generationId=" + generationId + |
| | |
| | | } |
| | | else |
| | | { |
| | | replicationCache.setGenerationId(generationId, false); |
| | | replicationServerDomain.setGenerationId(generationId, false); |
| | | } |
| | | } |
| | | } |
| | |
| | | this.baseDn = receivedMsg.getBaseDn(); |
| | | if (baseDn == null) |
| | | { |
| | | // Get or create the ReplicationCache |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn, |
| | | true); |
| | | localGenerationId = replicationCache.getGenerationId(); |
| | | ServerState serverState = replicationCache.getDbServerState(); |
| | | // Get or create the ReplicationServerDomain |
| | | replicationServerDomain = replicationServer. |
| | | getReplicationServerDomain(this.baseDn, true); |
| | | localGenerationId = replicationServerDomain.getGenerationId(); |
| | | ServerState serverState = replicationServerDomain.getDbServerState(); |
| | | |
| | | // The session initiator decides whether to use SSL. |
| | | sslEncryption = receivedMsg.getSSLEncryption(); |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | Set<String> ss = this.serverState.toStringSet(); |
| | | Set<String> lss = replicationCache.getDbServerState().toStringSet(); |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | Set<String> lss = |
| | | replicationServerDomain.getDbServerState().toStringSet(); |
| | | TRACER.debugInfo("In " + replicationServerDomain. |
| | | getReplicationServer().getMonitorInstanceName() + |
| | | ", SH received START from RS serverId=" + serverId + |
| | | " baseDN=" + this.baseDn + |
| | | " generationId=" + generationId + |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + " RS with serverID=" + serverId + |
| | | " is connected with the right generation ID"); |
| | | } |
| | |
| | | if (generationId != localGenerationId) |
| | | { |
| | | // if the 2 RS have different generationID |
| | | if (replicationCache.getGenerationIdSavedStatus()) |
| | | if (replicationServerDomain.getGenerationIdSavedStatus()) |
| | | { |
| | | // it the present RS has received changes regarding its |
| | | // gen ID and so won't change without a reset |
| | |
| | | // set the gen ID received from the peer RS |
| | | // specially if the peer has a non nul state and |
| | | // we have a nul state ? |
| | | // replicationCache.setGenerationId(generationId, false); |
| | | // replicationServerDomain. |
| | | // setGenerationId(generationId, false); |
| | | Message message = NOTE_BAD_GENERATION_ID.get( |
| | | this.baseDn.toNormalizedString(), |
| | | Short.toString(receivedMsg.getServerId()), |
| | |
| | | else |
| | | { |
| | | // The local RS is not initialized - take the one received |
| | | replicationCache.setGenerationId(generationId, false); |
| | | replicationServerDomain.setGenerationId(generationId, false); |
| | | } |
| | | } |
| | | } |
| | |
| | | return; // we did not recognize the message, ignore it |
| | | } |
| | | |
| | | // Get or create the ReplicationCache |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn, |
| | | true); |
| | | // Get or create the ReplicationServerDomain |
| | | replicationServerDomain = replicationServer. |
| | | getReplicationServerDomain(this.baseDn,true); |
| | | |
| | | boolean started; |
| | | if (serverIsLDAPserver) |
| | | { |
| | | started = replicationCache.startServer(this); |
| | | started = replicationServerDomain.startServer(this); |
| | | } |
| | | else |
| | | { |
| | | started = replicationCache.startReplicationServer(this); |
| | | started = replicationServerDomain.startReplicationServer(this); |
| | | } |
| | | |
| | | if (started) |
| | |
| | | // sendWindow MUST be created before starting the writer |
| | | sendWindow = new Semaphore(sendWindowSize); |
| | | |
| | | writer = new ServerWriter(session, serverId, this, replicationCache); |
| | | reader = new ServerReader(session, serverId, this, replicationCache); |
| | | writer = new ServerWriter(session, serverId, |
| | | this, replicationServerDomain); |
| | | reader = new ServerReader(session, serverId, |
| | | this, replicationServerDomain); |
| | | |
| | | reader.start(); |
| | | writer.start(); |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + " RS failed to start locally " + |
| | | " the connection from serverID="+serverId); |
| | | } |
| | |
| | | * the sum of the number of missing changes for every dbHandler. |
| | | */ |
| | | int totalCount = 0; |
| | | ServerState dbState = replicationCache.getDbServerState(); |
| | | ServerState dbState = replicationServerDomain.getDbServerState(); |
| | | for (short id : dbState) |
| | | { |
| | | int max = dbState.getMaxChangeNumber(id).getSeqnum(); |
| | |
| | | * Ignore updates from a server that is degraded due to |
| | | * its inconsistent generationId |
| | | */ |
| | | long referenceGenerationId = replicationCache.getGenerationId(); |
| | | long referenceGenerationId = replicationServerDomain.getGenerationId(); |
| | | if ((referenceGenerationId>0) && |
| | | (referenceGenerationId != generationId)) |
| | | { |
| | |
| | | saturationCount = 0; |
| | | try |
| | | { |
| | | replicationCache.checkAllSaturation(); |
| | | replicationServerDomain.checkAllSaturation(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | |
| | | SortedSet<ReplicationIterator> iteratorSortedSet = |
| | | new TreeSet<ReplicationIterator>(comparator); |
| | | /* fill the lateQueue */ |
| | | for (short serverId : replicationCache.getServers()) |
| | | for (short serverId : replicationServerDomain.getServers()) |
| | | { |
| | | ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId); |
| | | ReplicationIterator iterator = |
| | | replicationCache.getChangelogIterator(serverId, lastCsn); |
| | | replicationServerDomain.getChangelogIterator(serverId, lastCsn); |
| | | if ((iterator != null) && (iterator.getChange() != null)) |
| | | { |
| | | iteratorSortedSet.add(iterator); |
| | |
| | | } |
| | | if (completedFlag) |
| | | { |
| | | replicationCache.sendAck(changeNumber, true); |
| | | replicationServerDomain.sendAck(changeNumber, true); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | if (completedFlag) |
| | | { |
| | | ReplicationCache replicationCache = ackList.getChangelogCache(); |
| | | replicationCache.sendAck(changeNumber, false, |
| | | ReplicationServerDomain replicationServerDomain = |
| | | ackList.getChangelogCache(); |
| | | replicationServerDomain.sendAck(changeNumber, false, |
| | | ackList.getReplicationServerId()); |
| | | } |
| | | } |
| | |
| | | * @param update The update that must be added to the list. |
| | | * @param ChangelogServerId The identifier of the replicationServer that sent |
| | | * the update. |
| | | * @param replicationCache The ReplicationCache from which the change was |
| | | * processed and to which the ack must later be sent. |
| | | * @param replicationServerDomain The ReplicationServerDomain from which the |
| | | * change was processed and to which the ack |
| | | * must later be sent. |
| | | * @param nbWaitedAck The number of ack that must be received before |
| | | * the update is fully acked. |
| | | */ |
| | | public static void addWaitingAck( |
| | | UpdateMessage update, |
| | | short ChangelogServerId, ReplicationCache replicationCache, |
| | | short ChangelogServerId, ReplicationServerDomain replicationServerDomain, |
| | | int nbWaitedAck) |
| | | { |
| | | ReplServerAckMessageList ackList = |
| | | new ReplServerAckMessageList(update.getChangeNumber(), |
| | | nbWaitedAck, |
| | | ChangelogServerId, replicationCache); |
| | | ChangelogServerId, |
| | | replicationServerDomain); |
| | | synchronized(changelogsWaitingAcks) |
| | | { |
| | | changelogsWaitingAcks.put(update.getChangeNumber(), ackList); |
| | |
| | | { |
| | | if (flowControl) |
| | | { |
| | | if (replicationCache.restartAfterSaturation(this)) |
| | | if (replicationServerDomain.restartAfterSaturation(this)) |
| | | { |
| | | flowControl = false; |
| | | } |
| | |
| | | public void process(RoutableMessage msg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " processes received msg=" + msg); |
| | | replicationCache.process(msg, this); |
| | | replicationServerDomain.process(msg, this); |
| | | } |
| | | |
| | | /** |
| | |
| | | throws IOException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " sends message=" + info); |
| | |
| | | public void receiveReplServerInfo(ReplServerInfoMessage infoMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " sets replServerInfo " + "<" + infoMsg + ">"); |
| | |
| | | public void send(RoutableMessage msg) throws IOException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | TRACER.debugInfo("In " + |
| | | replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " sends message=" + msg); |