| | |
| | | */ |
| | | public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg> |
| | | { |
| | | private final String baseDn; |
| | | private final DN baseDN; |
| | | |
| | | /** |
| | | * The Status analyzer that periodically verifies whether the connected DSs |
| | |
| | | private ServerState ctHeartbeatState; |
| | | |
| | | /** |
| | | * Creates a new ReplicationServerDomain associated to the DN baseDn. |
| | | * Creates a new ReplicationServerDomain associated to the baseDN. |
| | | * |
| | | * @param baseDn |
| | | * The baseDn associated to the ReplicationServerDomain. |
| | | * @param baseDN |
| | | * The baseDN associated to the ReplicationServerDomain. |
| | | * @param localReplicationServer |
| | | * the ReplicationServer that created this instance. |
| | | */ |
| | | public ReplicationServerDomain(String baseDn, |
| | | public ReplicationServerDomain(DN baseDN, |
| | | ReplicationServer localReplicationServer) |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.baseDN = baseDN; |
| | | this.localReplicationServer = localReplicationServer; |
| | | this.assuredTimeoutTimer = new Timer("Replication server RS(" |
| | | + localReplicationServer.getServerId() |
| | | + ") assured timer for domain \"" + baseDn + "\"", true); |
| | | + ") assured timer for domain \"" + baseDN + "\"", true); |
| | | this.changelogDB = localReplicationServer.getChangelogDB(); |
| | | |
| | | DirectoryServer.registerMonitorProvider(this); |
| | |
| | | // Unknown assured mode: should never happen |
| | | Message errorMsg = ERR_RS_UNKNOWN_ASSURED_MODE.get( |
| | | Integer.toString(localReplicationServer.getServerId()), |
| | | assuredMode.toString(), baseDn, update.toString()); |
| | | assuredMode.toString(), baseDN.toNormalizedString(), |
| | | update.toString()); |
| | | logError(errorMsg); |
| | | assuredMessage = false; |
| | | } |
| | |
| | | { |
| | | try |
| | | { |
| | | if (this.changelogDB.publishUpdateMsg(baseDn, serverId, updateMsg)) |
| | | if (this.changelogDB.publishUpdateMsg(baseDN, serverId, updateMsg)) |
| | | { |
| | | /* |
| | | * JNR: Matt and I had a hard time figuring out where to put this |
| | |
| | | // Should never happen |
| | | Message errorMsg = ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL.get( |
| | | Integer.toString(localReplicationServer.getServerId()), |
| | | Byte.toString(safeDataLevel), baseDn, update.toString()); |
| | | Byte.toString(safeDataLevel), baseDN.toNormalizedString(), |
| | | update.toString()); |
| | | logError(errorMsg); |
| | | } else if (sourceGroupId == groupId |
| | | // Assured feature does not cross different group IDS |
| | |
| | | mb.append(ERR_RS_ERROR_SENDING_ACK.get( |
| | | Integer.toString(localReplicationServer.getServerId()), |
| | | Integer.toString(origServer.getServerId()), |
| | | csn.toString(), baseDn)); |
| | | csn.toString(), baseDN.toNormalizedString())); |
| | | mb.append(" "); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | |
| | | mb.append(ERR_RS_ERROR_SENDING_ACK.get( |
| | | Integer.toString(localReplicationServer.getServerId()), |
| | | Integer.toString(origServer.getServerId()), |
| | | csn.toString(), baseDn)); |
| | | csn.toString(), baseDN.toNormalizedString())); |
| | | mb.append(" "); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | |
| | | */ |
| | | public Set<Integer> getServerIds() |
| | | { |
| | | return changelogDB.getDomainServerIds(baseDn); |
| | | return changelogDB.getDomainServerIds(baseDN); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN) |
| | | { |
| | | return changelogDB.getCursorFrom(baseDn, serverId, startAfterCSN); |
| | | return changelogDB.getCursorFrom(baseDN, serverId, startAfterCSN); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public long getCount(int serverId, CSN from, CSN to) |
| | | { |
| | | return changelogDB.getCount(baseDn, serverId, from, to); |
| | | return changelogDB.getCount(baseDN, serverId, from, to); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public long getChangesCount() |
| | | { |
| | | return changelogDB.getDomainChangesCount(baseDn); |
| | | return changelogDB.getDomainChangesCount(baseDN); |
| | | } |
| | | |
| | | /** |
| | | * Get the baseDn. |
| | | * @return Returns the baseDn. |
| | | * Get the baseDN. |
| | | * |
| | | * @return Returns the baseDN. |
| | | */ |
| | | public String getBaseDn() |
| | | public DN getBaseDN() |
| | | { |
| | | return baseDn; |
| | | return baseDN; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get( |
| | | this.baseDn, Integer.toString(msg.getDestination()))); |
| | | baseDN.toNormalizedString(), Integer.toString(msg.getDestination()))); |
| | | mb.append(" In Replication Server=").append( |
| | | this.localReplicationServer.getMonitorInstanceName()); |
| | | mb.append(" unroutable message =").append(msg.getClass().getSimpleName()); |
| | |
| | | */ |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get( |
| | | this.baseDn, Integer.toString(msg.getDestination()))); |
| | | baseDN.toNormalizedString(), |
| | | Integer.toString(msg.getDestination()))); |
| | | mb.append(" unroutable message =" + msg.getClass().getSimpleName()); |
| | | mb.append(" Details: " + ioe.getLocalizedMessage()); |
| | | final Message message = mb.toMessage(); |
| | |
| | | |
| | | stopAllServers(true); |
| | | |
| | | changelogDB.shutdownDomain(baseDn); |
| | | changelogDB.shutdownDomain(baseDN); |
| | | } |
| | | |
| | | /** |
| | |
| | | public ServerState getDbServerState() |
| | | { |
| | | ServerState serverState = new ServerState(); |
| | | for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDn).values()) |
| | | for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDN).values()) |
| | | { |
| | | serverState.update(lastCSN); |
| | | } |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "ReplicationServerDomain " + baseDn; |
| | | return "ReplicationServerDomain " + baseDN; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | if (i == 2) |
| | | { |
| | | Message message = |
| | | ERR_EXCEPTION_SENDING_TOPO_INFO |
| | | .get(baseDn, "directory", Integer.toString(dsHandler |
| | | .getServerId()), e.getMessage()); |
| | | Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get( |
| | | baseDN.toNormalizedString(), "directory", |
| | | Integer.toString(dsHandler.getServerId()), e.getMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | |
| | | if (i == 2) |
| | | { |
| | | Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get( |
| | | baseDn, "replication", |
| | | baseDN.toNormalizedString(), "replication", |
| | | Integer.toString(rsHandler.getServerId()), e.getMessage()); |
| | | logError(message); |
| | | } |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + this + " Receiving ResetGenerationIdMsg from " |
| | | + senderHandler.getServerId() + " for baseDn " + baseDn + ":\n" |
| | | + genIdMsg); |
| | | debug("Receiving ResetGenerationIdMsg from " |
| | | + senderHandler.getServerId() + ":\n" + genIdMsg); |
| | | } |
| | | |
| | | try |
| | |
| | | // Order to take a gen id we already have, just ignore |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + this |
| | | + " Reset generation id requested for baseDn " + baseDn |
| | | + " but generation id was already " + this.generationId + ":\n" |
| | | + genIdMsg); |
| | | debug("Reset generation id requested but generationId was already " |
| | | + this.generationId + ":\n" + genIdMsg); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | } catch (IOException e) |
| | | { |
| | | logError(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID.get(baseDn, |
| | | e.getMessage())); |
| | | logError(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID.get( |
| | | baseDN.toNormalizedString(), e.getMessage())); |
| | | } |
| | | } |
| | | |
| | |
| | | dsHandler.changeStatusForResetGenId(newGenId); |
| | | } catch (IOException e) |
| | | { |
| | | logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get(baseDn, |
| | | logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get( |
| | | baseDN.toNormalizedString(), |
| | | Integer.toString(dsHandler.getServerId()), |
| | | e.getMessage())); |
| | | } |
| | |
| | | // treatment. |
| | | sendTopoInfoToAll(); |
| | | |
| | | logError(NOTE_RESET_GENERATION_ID.get(baseDn, newGenId)); |
| | | logError(NOTE_RESET_GENERATION_ID.get(baseDN.toNormalizedString(), |
| | | newGenId)); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | |
| | | sendTopoInfoToAllExcept(senderHandler); |
| | | |
| | | Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get( |
| | | senderHandler.getServerId(), baseDn, newStatus.toString()); |
| | | senderHandler.getServerId(), baseDN.toNormalizedString(), |
| | | newStatus.toString()); |
| | | logError(message); |
| | | } |
| | | catch(Exception e) |
| | |
| | | // StatusAnalyzer. |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Status analyzer for domain " + baseDn |
| | | TRACER.debugInfo("Status analyzer for domain " + baseDN |
| | | + " has been interrupted when" |
| | | + " trying to acquire domain lock for changing the status of DS " |
| | | + dsHandler.getServerId()); |
| | |
| | | catch (IOException e) |
| | | { |
| | | logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER |
| | | .get(baseDn, |
| | | .get(baseDN.toNormalizedString(), |
| | | Integer.toString(dsHandler.getServerId()), |
| | | e.getMessage())); |
| | | } |
| | |
| | | public void clearDbs() |
| | | { |
| | | // Reset the localchange and state db for the current domain |
| | | changelogDB.clearDomain(baseDn); |
| | | changelogDB.clearDomain(baseDN); |
| | | try |
| | | { |
| | | localReplicationServer.clearGenerationId(baseDn); |
| | | localReplicationServer.clearGenerationId(baseDN); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | rsHandler.getServerId(), |
| | | rsHandler.session.getReadableRemoteAddress(), |
| | | rsHandler.getGenerationId(), |
| | | baseDn, getLocalRSServerId(), generationId); |
| | | baseDN.toNormalizedString(), getLocalRSServerId(), generationId); |
| | | logError(message); |
| | | |
| | | ErrorMsg errorMsg = new ErrorMsg(getLocalRSServerId(), |
| | |
| | | { |
| | | return "Replication server RS(" + localReplicationServer.getServerId() |
| | | + ") " + localReplicationServer.getServerURL() + ",cn=" |
| | | + baseDn.replace(',', '_').replace('=', '_') + ",cn=Replication"; |
| | | + baseDN.toNormalizedString().replace(',', '_').replace('=', '_') |
| | | + ",cn=Replication"; |
| | | } |
| | | |
| | | /** |
| | |
| | | String.valueOf(localReplicationServer.getServerId()))); |
| | | attributes.add(Attributes.create("replication-server-port", |
| | | String.valueOf(localReplicationServer.getReplicationPort()))); |
| | | attributes.add(Attributes.create("domain-name", baseDn)); |
| | | attributes.add(Attributes.create("domain-name", |
| | | baseDN.toNormalizedString())); |
| | | attributes.add(Attributes.create("generation-id", |
| | | baseDn + " " + generationId)); |
| | | baseDN + " " + generationId)); |
| | | |
| | | // Missing changes |
| | | long missingChanges = getDomainMonitorData().getMissingChangesRS( |
| | |
| | | if (eligibleCSN.olderOrEqual(mostRecentDbCSN)) |
| | | { |
| | | // let's try to seek the first change <= eligibleCSN |
| | | CSN newCSN = changelogDB.getCSNAfter(baseDn, serverId, eligibleCSN); |
| | | CSN newCSN = changelogDB.getCSNAfter(baseDN, serverId, eligibleCSN); |
| | | result.update(newCSN); |
| | | } else { |
| | | // for this serverId, all changes in the ChangelogDb are holder |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER |
| | | .debugInfo("In " + this + " getEligibleState() result is " + result); |
| | | debug("getEligibleState() result is " + result); |
| | | } |
| | | return result; |
| | | } |
| | |
| | | public ServerState getStartState() |
| | | { |
| | | ServerState domainStartState = new ServerState(); |
| | | for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDn).values()) |
| | | for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDN).values()) |
| | | { |
| | | domainStartState.update(firstCSN); |
| | | } |
| | |
| | | CSN eligibleCSN = null; |
| | | |
| | | for (Entry<Integer, CSN> entry : |
| | | changelogDB.getDomainLastCSNs(baseDn).entrySet()) |
| | | changelogDB.getDomainLastCSNs(baseDN).entrySet()) |
| | | { |
| | | // Consider this producer (DS/db). |
| | | final int serverId = entry.getKey(); |
| | |
| | | logError(ERR_CHANGELOG_ERROR_SENDING_MSG |
| | | .get("Replication Server " |
| | | + localReplicationServer.getReplicationPort() + " " |
| | | + baseDn + " " + localReplicationServer.getServerId())); |
| | | + baseDN + " " + localReplicationServer.getServerId())); |
| | | stopServer(rsHandler, false); |
| | | } |
| | | } |
| | |
| | | */ |
| | | public long getLatestDomainTrimDate() |
| | | { |
| | | return changelogDB.getDomainLatestTrimDate(baseDn); |
| | | return changelogDB.getDomainLatestTrimDate(baseDN); |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | private void debug(String message) |
| | | { |
| | | TRACER.debugInfo("In RS serverId=" + localReplicationServer.getServerId() |
| | | + " for baseDn=" + baseDn + " and port=" |
| | | + localReplicationServer.getReplicationPort() + ": " + message); |
| | | TRACER.debugInfo("In ReplicationServerDomain serverId=" |
| | | + localReplicationServer.getServerId() + " for baseDN=" + baseDN |
| | | + " and port=" + localReplicationServer.getReplicationPort() |
| | | + ": " + message); |
| | | } |
| | | } |