| | |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.common.StatusMachineEvent; |
| | | import org.opends.server.replication.protocol.AckMsg; |
| | | import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg; |
| | | import org.opends.server.replication.protocol.ChangeStatusMsg; |
| | | import org.opends.server.replication.protocol.ErrorMsg; |
| | | import org.opends.server.replication.protocol.MonitorMsg; |
| | |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeBuilder; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | |
| | | /** |
| | |
| | | // every n number of treated assured messages |
| | | private int assuredTimeoutTimerPurgeCounter = 0; |
| | | |
| | | ServerState ctHeartbeatState = null; |
| | | |
| | | /** |
| | | * Creates a new ReplicationServerDomain associated to the DN baseDn. |
| | | * |
| | |
| | | if ( (generationId>0) && (generationId != handler.getGenerationId()) ) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In RS " + |
| | | replicationServer.getServerId() + |
| | | TRACER.debugInfo("In " + this.getName() + |
| | | " for dn " + baseDn + ", update " + |
| | | update.getChangeNumber().toString() + |
| | | " will not be sent to replication server " + |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) |
| | | TRACER.debugInfo("In RS " + |
| | | replicationServer.getServerId() + |
| | | TRACER.debugInfo("In " + this + |
| | | " for dn " + baseDn + ", update " + |
| | | update.getChangeNumber().toString() + |
| | | " will not be sent to directory server " + |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " domain=" + this + |
| | | " stopServer(SH)" + handler.getMonitorInstanceName() + |
| | | " " + stackTraceToSingleLineString(new Exception())); |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " domain=" + this + " stopServer() on the server handler " + |
| | | handler.getMonitorInstanceName()); |
| | | /* |
| | | * We must prevent deadlock on replication server domain lock, when for |
| | | * instance this code is called from dying ServerReader but also dying |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " domain=" + this + |
| | | " stopServer(MH)" + handler.getMonitorInstanceName() + |
| | | " " + stackTraceToSingleLineString(new Exception())); |
| | | "In " + this.replicationServer.getMonitorInstanceName() |
| | | + " domain=" + this + " stopServer() on the message handler " |
| | | + handler.getMonitorInstanceName()); |
| | | /* |
| | | * We must prevent deadlock on replication server domain lock, when for |
| | | * instance this code is called from dying ServerReader but also dying |
| | |
| | | |
| | | try |
| | | { |
| | | return handler.generateIterator(changeNumber); |
| | | ReplicationIterator it = handler.generateIterator(changeNumber); |
| | | if (it.next()==false) |
| | | { |
| | | it.releaseCursor(); |
| | | throw new Exception("no new change"); |
| | | } |
| | | return it; |
| | | } catch (Exception e) |
| | | { |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates and returns an iterator. |
| | | * When the iterator is not used anymore, the caller MUST call the |
| | | * ReplicationIterator.releaseCursor() method to free the resources |
| | | * and locks used by the ReplicationIterator. |
| | | * |
| | | * @param serverId Identifier of the server for which the iterator is created. |
| | | * @param changeNumber Starting point for the iterator. |
| | | * @return the created ReplicationIterator. Null when no DB is available |
| | | * for the provided server Id. |
| | | */ |
| | | public ReplicationIterator getIterator(short serverId, |
| | | ChangeNumber changeNumber) |
| | | { |
| | | DbHandler handler = sourceDbHandlers.get(serverId); |
| | | if (handler == null) |
| | | return null; |
| | | try |
| | | { |
| | | ReplicationIterator it = handler.generateIterator(changeNumber); |
| | | return it; |
| | | } catch (Exception e) |
| | | { |
| | | return null; |
| | |
| | | ResetGenerationIdMsg genIdMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + getReplicationServer().getServerId() + |
| | | "In " + this + |
| | | " Receiving ResetGenerationIdMsg from " + senderHandler.getServerId() + |
| | | " for baseDn " + baseDn + ":\n" + genIdMsg); |
| | | } |
| | | |
| | | try |
| | | { |
| | |
| | | { |
| | | // Order to take a gen id we already have, just ignore |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + getReplicationServer().getServerId() |
| | | "In " + this |
| | | + " Reset generation id requested for baseDn " + baseDn |
| | | + " but generation id was already " + this.generationId |
| | | + ":\n" + genIdMsg); |
| | | } |
| | | } |
| | | |
| | | // If we are the first replication server warned, |
| | | // then forwards the reset message to the remote replication servers |
| | |
| | | rsHandler.setGenerationId(newGenId); |
| | | if (senderHandler.isDataServer()) |
| | | { |
| | | rsHandler.forwardGenerationIdToRS(genIdMsg); |
| | | rsHandler.send(genIdMsg); |
| | | } |
| | | } catch (IOException e) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Clears the Db associated with that cache. |
| | | * Clears the Db associated with that domain. |
| | | */ |
| | | public void clearDbs() |
| | | { |
| | |
| | | } |
| | | } |
| | | stopDbHandlers(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDN=" + baseDn + |
| | | " The source db handler has been cleared"); |
| | | } |
| | | try |
| | | { |
| | |
| | | */ |
| | | public void receivesMonitorDataResponse(MonitorMsg msg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | "Receiving " + msg + " from " + msg.getsenderID()); |
| | | |
| | | try |
| | | { |
| | | synchronized (monitorDataLock) |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | "In " + this + |
| | | " baseDn=" + baseDn + |
| | | " Processed msg from " + msg.getsenderID() + |
| | | " New monitor data: " + wrkMonitorData.toString()); |
| | |
| | | * Return the state that contain for each server the time of eligibility. |
| | | * @return the state. |
| | | */ |
| | | public ServerState getHeartbeatState() |
| | | public ServerState getChangeTimeHeartbeatState() |
| | | { |
| | | // TODO:ECL Eligility must be supported |
| | | return this.getDbServerState(); |
| | | if (ctHeartbeatState == null) |
| | | { |
| | | ctHeartbeatState = this.getDbServerState().duplicate(); |
| | | } |
| | | return ctHeartbeatState; |
| | | } |
| | | |
| | | /** |
| | | * TODO: code cleaning - remove this method. |
| | | * Computes the change number eligible to the ECL. |
| | | * @return null if the domain does not play in eligibility. |
| | | */ |
| | | public ChangeNumber computeEligibleCN() |
| | | public ChangeNumber computeEligibleCN2() |
| | | { |
| | | ChangeNumber elligibleCN = null; |
| | | ServerState heartbeatState = getHeartbeatState(); |
| | | ChangeNumber eligibleCN = null; |
| | | ServerState heartbeatState = getChangeTimeHeartbeatState(); |
| | | |
| | | if (heartbeatState==null) |
| | | return null; |
| | | |
| | | // compute elligible CN |
| | | // compute eligible CN |
| | | ServerState hbState = heartbeatState.duplicate(); |
| | | |
| | | Iterator<Short> it = hbState.iterator(); |
| | |
| | | if (TimeThread.getTime()-storedCN.getTime()>2000) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "For RSD." + this.baseDn + " Server " + sid |
| | | TRACER.debugInfo("In " + this.getName() + |
| | | " Server " + sid |
| | | + " is not considered for eligibility ... potentially down"); |
| | | continue; |
| | | } |
| | | |
| | | if ((elligibleCN == null) || (storedCN.older(elligibleCN))) |
| | | if ((eligibleCN == null) || (storedCN.older(eligibleCN))) |
| | | { |
| | | elligibleCN = storedCN; |
| | | eligibleCN = storedCN; |
| | | } |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "For RSD." + this.baseDn + " ElligibleCN()=" + elligibleCN); |
| | | return elligibleCN; |
| | | TRACER.debugInfo("In " + this.getName() + |
| | | " computeEligibleCN() returns " + eligibleCN); |
| | | return eligibleCN; |
| | | } |
| | | |
| | | /** |
| | | * Computes the eligible server state by minimizing the dbServerState and the |
| | | * elligibleCN. |
| | | * Computes the eligible server state for the domain. |
| | | * Consists in taking the most recent change from the dbServerState and the |
| | | * eligibleCN. |
| | | * @param eligibleCN The provided eligibleCN. |
| | | * @return The computed eligible server state. |
| | | */ |
| | | public ServerState getCLElligibleState() |
| | | public ServerState getEligibleState(ChangeNumber eligibleCN) |
| | | { |
| | | // ChangeNumber elligibleCN = computeEligibleCN(); |
| | | ServerState res = new ServerState(); |
| | | ServerState dbState = this.getDbServerState(); |
| | | res = dbState; |
| | | ServerState result = new ServerState(); |
| | | |
| | | /* TODO:ECL Eligibility is not yet implemented |
| | | Iterator<Short> it = dbState.iterator(); |
| | | while (it.hasNext()) |
| | | ServerState dbState = this.getDbServerState(); |
| | | |
| | | result = dbState.duplicate(); |
| | | |
| | | if (eligibleCN != null) |
| | | { |
| | | Short sid = it.next(); |
| | | DbHandler h = sourceDbHandlers.get(sid); |
| | | ChangeNumber dbCN = dbState.getMaxChangeNumber(sid); |
| | | try |
| | | Iterator<Short> it = dbState.iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | if ((elligibleCN!=null)&&(elligibleCN.older(dbCN))) |
| | | Short sid = it.next(); |
| | | DbHandler h = sourceDbHandlers.get(sid); |
| | | ChangeNumber dbCN = dbState.getMaxChangeNumber(sid); |
| | | try |
| | | { |
| | | // some CN exist in the db newer than elligible CN |
| | | ReplicationIterator ri = h.generateIterator(elligibleCN); |
| | | ChangeNumber newCN = ri.getCurrentCN(); |
| | | res.update(newCN); |
| | | ri.releaseCursor(); |
| | | if (eligibleCN.older(dbCN)) |
| | | { |
| | | // some CN exist in the db newer than eligible CN |
| | | // let's get it |
| | | ReplicationIterator ri = h.generateIterator(eligibleCN); |
| | | try |
| | | { |
| | | if ((ri != null) && (ri.getChange()!=null)) |
| | | { |
| | | ChangeNumber newCN = ri.getChange().getChangeNumber(); |
| | | result.update(newCN); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | ri.releaseCursor(); |
| | | ri = null; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // no CN exist in the db newer than elligible CN |
| | | result.update(dbCN); |
| | | } |
| | | } |
| | | else |
| | | catch(Exception e) |
| | | { |
| | | // no CN exist in the db newer than elligible CN |
| | | res.update(dbCN); |
| | | Message errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get( |
| | | " " + stackTraceToSingleLineString(e)); |
| | | logError(errMessage); |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | */ |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + this.getName() |
| | | + " getCLElligibleState returns:" + res); |
| | | return res; |
| | | TRACER.debugInfo("In " + this |
| | | + " getEligibleState() result is " + result); |
| | | return result; |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | return domainStartState; |
| | | } |
| | | |
| | | /** |
| | | * Returns the eligibleCN for that domain - relies on the ChangeTimeHeartbeat |
| | | * state. |
| | | * For each DS, take the oldest CN from the changetime hearbeat state |
| | | * and from the changelog db last CN. Can be null. |
| | | * @return the eligible CN. |
| | | */ |
| | | public ChangeNumber getEligibleCN() |
| | | { |
| | | ChangeNumber eligibleCN = null; |
| | | |
| | | for (DbHandler db : sourceDbHandlers.values()) |
| | | { |
| | | // Consider this producer (DS/db). |
| | | short sid = db.getServerId(); |
| | | |
| | | ChangeNumber changelogLastCN = db.getLastChange(); |
| | | if (changelogLastCN != null) |
| | | { |
| | | if ((eligibleCN == null) || (changelogLastCN.newer(eligibleCN))) |
| | | { |
| | | eligibleCN = changelogLastCN; |
| | | } |
| | | } |
| | | |
| | | ChangeNumber heartbeatLastDN = |
| | | getChangeTimeHeartbeatState().getMaxChangeNumber(sid); |
| | | |
| | | if ((heartbeatLastDN != null) && |
| | | ((eligibleCN == null) || (heartbeatLastDN.newer(eligibleCN)))) |
| | | { |
| | | eligibleCN = heartbeatLastDN; |
| | | } |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.getName() + " getEligibleCN() returns result =" |
| | | + eligibleCN); |
| | | return eligibleCN; |
| | | } |
| | | |
| | | /** |
| | | * Processes a ChangeTimeHeartbeatMsg received, by storing the CN (timestamp) |
| | | * value received, and forwarding the message to the other RSes. |
| | | * @param senderHandler The handler for the server that sent the heartbeat. |
| | | * @param msg The message to process. |
| | | */ |
| | | public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler, |
| | | ChangeTimeHeartbeatMsg msg ) |
| | | { |
| | | try |
| | | { |
| | | // Acquire lock on domain (see more details in comment of start() method |
| | | // of ServerHandler) |
| | | lock(); |
| | | } catch (InterruptedException ex) |
| | | { |
| | | // Try doing job anyway... |
| | | } |
| | | |
| | | try |
| | | { |
| | | storeReceivedCTHeartbeat(msg.getChangeNumber()); |
| | | |
| | | // If we are the first replication server warned, |
| | | // then forwards the reset message to the remote replication servers |
| | | for (ReplicationServerHandler rsHandler : replicationServers.values()) |
| | | { |
| | | try |
| | | { |
| | | // After we'll have sent the message , the remote RS will adopt |
| | | // the new genId |
| | | if (senderHandler.isDataServer()) |
| | | { |
| | | rsHandler.send(msg); |
| | | } |
| | | } catch (IOException e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(rsHandler.getName())); |
| | | stopServer(rsHandler); |
| | | } |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | release(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Store a change time value received from a data server. |
| | | * @param cn The provided change time. |
| | | */ |
| | | public void storeReceivedCTHeartbeat(ChangeNumber cn) |
| | | { |
| | | // TODO:May be we can spare processing by only storing CN (timestamp) |
| | | // instead of a server state. |
| | | getChangeTimeHeartbeatState().update(cn); |
| | | |
| | | /* |
| | | if (debugEnabled()) |
| | | { |
| | | Set<String> ss = ctHeartbeatState.toStringSet(); |
| | | String dss = ""; |
| | | for (String s : ss) |
| | | { |
| | | dss = dss + " \\ " + s; |
| | | } |
| | | TRACER.debugInfo("In " + this.getName() + " " + dss); |
| | | } |
| | | */ |
| | | } |
| | | |
| | | /** |
| | | * This methods count the changes, server by server : |
| | | * - from a start point (cn taken from the provided startState) |
| | | * - to an end point (the provided endCN). |
| | | * @param startState The provided start server state. |
| | | * @param endCN The provided end change number. |
| | | * @return The number of changes between startState and endCN. |
| | | */ |
| | | public long getEligibleCount(ServerState startState, ChangeNumber endCN) |
| | | { |
| | | long res = 0; |
| | | ReplicationIterator ri=null; |
| | | |
| | | // Parses the dbState of the domain , server by server |
| | | ServerState dbState = this.getDbServerState(); |
| | | Iterator<Short> it = dbState.iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | // for each server |
| | | Short sid = it.next(); |
| | | DbHandler h = sourceDbHandlers.get(sid); |
| | | |
| | | try |
| | | { |
| | | // Set on the change related to the startState |
| | | ChangeNumber startCN = null; |
| | | try |
| | | { |
| | | ri = h.generateIterator(startState.getMaxChangeNumber(sid)); |
| | | startCN = ri.getChange().getChangeNumber(); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | // no change found (purge from CL) |
| | | startCN = null; |
| | | } |
| | | finally |
| | | { |
| | | if (ri!=null) |
| | | { |
| | | ri.releaseCursor(); |
| | | ri = null; |
| | | } |
| | | } |
| | | |
| | | if (startCN != null) |
| | | { |
| | | // Set on the change related to the endCN |
| | | ChangeNumber upperCN; |
| | | try |
| | | { |
| | | // Build a changenumber for this very server, with the timestamp |
| | | // of the endCN |
| | | ChangeNumber f = new ChangeNumber(endCN.getTime(), 0, sid); |
| | | ri = h.generateIterator(f); |
| | | upperCN = ri.getChange().getChangeNumber(); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | // no new change |
| | | upperCN = h.getLastChange(); |
| | | } |
| | | finally |
| | | { |
| | | if (ri!=null) |
| | | { |
| | | ri.releaseCursor(); |
| | | ri = null; |
| | | } |
| | | } |
| | | |
| | | long diff = upperCN.getSeqnum() - startCN.getSeqnum() + 1; |
| | | |
| | | res += diff; |
| | | } |
| | | // TODO:ECL We should compute if changenumber.seqnum has turned ! |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | return res; |
| | | } |
| | | } |