| | |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.server.util.ServerConstants.EOL; |
| | | import static org.opends.server.util.StaticUtils.getFileForPath; |
| | | import static org.opends.server.util.StaticUtils.isLocalAddress; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.util.ServerConstants.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.io.File; |
| | | import java.io.IOException; |
| | |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | import org.opends.server.types.SearchScope; |
| | | |
| | | |
| | | |
| | | /** |
| | | * ReplicationServer Listener. This singleton is the main object of the |
| | |
| | | private Thread listenThread; |
| | | private Thread connectThread; |
| | | |
| | | /* The list of replication servers configured by the administrator */ |
| | | /** The list of replication servers configured by the administrator. */ |
| | | private Collection<String> replicationServers; |
| | | |
| | | /* This table is used to store the list of dn for which we are currently |
| | | /** |
| | | * This table is used to store the list of dn for which we are currently |
| | | * handling servers. |
| | | */ |
| | | private final Map<String, ReplicationServerDomain> baseDNs = |
| | |
| | | private int queueSize; |
| | | private String dbDirname = null; |
| | | |
| | | // The delay (in sec) after which the changes must |
| | | // be deleted from the persistent storage. |
| | | /** |
| | | * The delay (in sec) after which the changes must be deleted from the |
| | | * persistent storage. |
| | | */ |
| | | private long purgeDelay; |
| | | |
| | | private int replicationPort; |
| | | private boolean stopListen = false; |
| | | private ReplSessionSecurity replSessionSecurity; |
| | | |
| | | // For the backend associated to this replication server, |
| | | // DN of the config entry of the backend |
| | | /** |
| | | * For the backend associated to this replication server, DN of the config |
| | | * entry of the backend. |
| | | */ |
| | | private DN backendConfigEntryDN; |
| | | // ID of the backend |
| | | /** ID of the backend. */ |
| | | private static final String backendId = "replicationChanges"; |
| | | |
| | | /* |
| | | * Assured mode properties |
| | | */ |
| | | // Timeout (in milliseconds) when waiting for acknowledgments |
| | | /** Timeout (in milliseconds) when waiting for acknowledgments. */ |
| | | private long assuredTimeout = 1000; |
| | | |
| | | // Group id |
| | | private byte groupId = (byte)1; |
| | | /** Group id. */ |
| | | private byte groupId = 1; |
| | | |
| | | // Number of pending changes for a DS, considered as threshold value to put |
| | | // the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled |
| | | /** |
| | | * Number of pending changes for a DS, considered as threshold value to put |
| | | * the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled. |
| | | */ |
| | | private int degradedStatusThreshold = 5000; |
| | | |
| | | // Number of milliseconds to wait before sending new monitoring messages. |
| | | // If value is 0, monitoring publisher is disabled |
| | | /** |
| | | * Number of milliseconds to wait before sending new monitoring messages. If |
| | | * value is 0, monitoring publisher is disabled. |
| | | */ |
| | | private long monitoringPublisherPeriod = 3000; |
| | | |
| | | // The handler of the draft change numbers database, the database used to |
| | | // store the relation between a draft change number ('seqnum') and the |
| | | // associated cookie. |
| | | // |
| | | // Guarded by draftCNLock |
| | | // |
| | | /** |
| | | * The handler of the draft change numbers database, the database used to |
| | | * store the relation between a draft change number ('seqnum') and the |
| | | * associated cookie. |
| | | * <p> |
| | | * Guarded by draftCNLock |
| | | */ |
| | | private DraftCNDbHandler draftCNDbHandler; |
| | | |
| | | // The last value generated of the draft change number. |
| | | // |
| | | // Guarded by draftCNLock |
| | | // |
| | | /** |
| | | * The last value generated of the draft change number. |
| | | * <p> |
| | | * Guarded by draftCNLock |
| | | **/ |
| | | private int lastGeneratedDraftCN = 0; |
| | | |
| | | // Used for protecting draft CN related state. |
| | | /** Used for protecting draft CN related state. */ |
| | | private final Object draftCNLock = new Object(); |
| | | |
| | | /** |
| | |
| | | private ECLWorkflowElement eclwe; |
| | | private WorkflowImpl externalChangeLogWorkflowImpl = null; |
| | | |
| | | // This is required for unit testing, so that we can keep track of all the |
| | | // replication servers which are running in the VM. |
| | | /** |
| | | * This is required for unit testing, so that we can keep track of all the |
| | | * replication servers which are running in the VM. |
| | | */ |
| | | private static Set<Integer> localPorts = new CopyOnWriteArraySet<Integer>(); |
| | | |
| | | // Monitors for synchronizing domain creation with the connect thread. |
| | |
| | | private final Object connectThreadLock = new Object(); |
| | | private long domainTicket = 0L; |
| | | |
| | | // ServiceIDs excluded for ECL |
| | | private ArrayList<String> excludedServiceIDs = new ArrayList<String>(); |
| | | /** ServiceIDs excluded for ECL. */ |
| | | private Collection<String> excludedServiceIDs = new ArrayList<String>(); |
| | | |
| | | /** |
| | | * The weight affected to the replication server. |
| | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(e.getLocalizedMessage()); |
| | | mb.append(" "); |
| | |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // Signalled to shutdown. |
| | | // Signaled to shutdown. |
| | | return; |
| | | } |
| | | } |
| | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (session != null) |
| | | { |
| | | session.close(); |
| | | } |
| | | |
| | | try |
| | | { |
| | | socket.close(); |
| | | } |
| | | catch (IOException ignored) |
| | | { |
| | | // Ignore. |
| | | } |
| | | close(session); |
| | | close(socket); |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public ConfigChangeResult applyConfigurationChange( |
| | | ReplicationServerCfg configuration) |
| | | { |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public boolean isConfigurationChangeAcceptable( |
| | | ReplicationServerCfg configuration, List<Message> unacceptableReasons) |
| | | { |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void processBackupBegin(Backend backend, BackupConfig config) |
| | | { |
| | | // Nothing is needed at the moment |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void processBackupEnd(Backend backend, BackupConfig config, |
| | | boolean successful) |
| | | { |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void processRestoreBegin(Backend backend, RestoreConfig config) |
| | | { |
| | | if (backend.getBackendID().equals(backendId)) |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void processRestoreEnd(Backend backend, RestoreConfig config, |
| | | boolean successful) |
| | | { |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void processImportBegin(Backend backend, LDIFImportConfig config) |
| | | { |
| | | // Nothing is needed at the moment |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void processImportEnd(Backend backend, LDIFImportConfig config, |
| | | boolean successful) |
| | | { |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void processExportBegin(Backend backend, LDIFExportConfig config) |
| | | { |
| | | if (debugEnabled()) |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void processExportEnd(Backend backend, LDIFExportConfig config, |
| | | boolean successful) |
| | | { |
| | |
| | | */ |
| | | public void clearDb() |
| | | { |
| | | Iterator<ReplicationServerDomain> rcachei = getDomainIterator(); |
| | | if (rcachei != null) |
| | | for (ReplicationServerDomain rsd : getReplicationServerDomains()) |
| | | { |
| | | while (rcachei.hasNext()) |
| | | { |
| | | ReplicationServerDomain rsd = rcachei.next(); |
| | | rsd.clearDbs(); |
| | | } |
| | | rsd.clearDbs(); |
| | | } |
| | | |
| | | synchronized (draftCNLock) |
| | |
| | | * @param excludedServiceIDs the provided list of serviceIDs excluded from |
| | | * the computation of eligibleCN. |
| | | */ |
| | | public void disableEligibility(ArrayList<String> excludedServiceIDs) |
| | | public void disableEligibility(List<String> excludedServiceIDs) |
| | | { |
| | | this.excludedServiceIDs = excludedServiceIDs; |
| | | } |
| | |
| | | // traverse the domains and get the eligible CN from each domain |
| | | // store the oldest one as the cross domain eligible CN |
| | | ChangeNumber eligibleCN = null; |
| | | Iterator<ReplicationServerDomain> rsdi = this.getDomainIterator(); |
| | | if (rsdi != null) |
| | | for (ReplicationServerDomain domain : getReplicationServerDomains()) |
| | | { |
| | | while (rsdi.hasNext()) |
| | | { |
| | | ReplicationServerDomain domain = rsdi.next(); |
| | | if ((excludedServiceIDs != null) && |
| | | excludedServiceIDs.contains(domain.getBaseDn())) |
| | | continue; |
| | | if ((excludedServiceIDs != null) && |
| | | excludedServiceIDs.contains(domain.getBaseDn())) |
| | | continue; |
| | | |
| | | ChangeNumber domainEligibleCN = domain.getEligibleCN(); |
| | | String dates = ""; |
| | | if (domainEligibleCN != null) |
| | | ChangeNumber domainEligibleCN = domain.getEligibleCN(); |
| | | String dates = ""; |
| | | if (domainEligibleCN != null) |
| | | { |
| | | if ((eligibleCN == null) || (domainEligibleCN.older(eligibleCN))) |
| | | { |
| | | if ((eligibleCN == null) || (domainEligibleCN.older(eligibleCN))) |
| | | { |
| | | eligibleCN = domainEligibleCN; |
| | | } |
| | | dates = new Date(domainEligibleCN.getTime()).toString(); |
| | | eligibleCN = domainEligibleCN; |
| | | } |
| | | debugLog += "[dn=" + domain.getBaseDn() |
| | | + "] [eligibleCN=" + domainEligibleCN + ", " + dates + "]"; |
| | | dates = new Date(domainEligibleCN.getTime()).toString(); |
| | | } |
| | | debugLog += "[dn=" + domain.getBaseDn() |
| | | + "] [eligibleCN=" + domainEligibleCN + ", " + dates + "]"; |
| | | } |
| | | |
| | | if (eligibleCN==null) |
| | |
| | | * (this diff is done domain by domain) |
| | | */ |
| | | |
| | | int firstDraftCN; |
| | | int lastDraftCN; |
| | | Boolean dbEmpty = false; |
| | | Long newestDate = 0L; |
| | | DraftCNDbHandler draftCNDbH = this.getDraftCNDbHandler(); |
| | | |
| | | // Get the first DraftCN from the DraftCNdb |
| | | firstDraftCN = draftCNDbH.getFirstKey(); |
| | | int firstDraftCN = draftCNDbH.getFirstKey(); |
| | | Map<String,ServerState> domainsServerStateForLastSeqnum = null; |
| | | ChangeNumber changeNumberForLastSeqnum = null; |
| | | String domainForLastSeqnum = null; |
| | |
| | | } |
| | | |
| | | // Domain by domain |
| | | Iterator<ReplicationServerDomain> rsdi = this.getDomainIterator(); |
| | | if (rsdi != null) |
| | | for (ReplicationServerDomain rsd : getReplicationServerDomains()) |
| | | { |
| | | while (rsdi.hasNext()) |
| | | if (excludedServiceIDs.contains(rsd.getBaseDn())) |
| | | continue; |
| | | |
| | | // for this domain, have the state in the replchangelog |
| | | // where the last DraftCN update is |
| | | long ec; |
| | | if (domainsServerStateForLastSeqnum == null) |
| | | { |
| | | // process a domain |
| | | ReplicationServerDomain rsd = rsdi.next(); |
| | | |
| | | if (excludedServiceIDs.contains(rsd.getBaseDn())) |
| | | continue; |
| | | |
| | | // for this domain, have the state in the replchangelog |
| | | // where the last DraftCN update is |
| | | long ec; |
| | | if (domainsServerStateForLastSeqnum == null) |
| | | { |
| | | // Count changes of this domain from the beginning of the changelog |
| | | ChangeNumber trimCN = |
| | | new ChangeNumber(rsd.getLatestDomainTrimDate(), 0,0); |
| | | ec = rsd.getEligibleCount( |
| | | rsd.getStartState().duplicateOnlyOlderThan(trimCN), |
| | | crossDomainEligibleCN); |
| | | } |
| | | else |
| | | { |
| | | // There are records in the draftDB (so already returned to clients) |
| | | // BUT |
| | | // There is nothing related to this domain in the last draft record |
| | | // (may be this domain was disabled when this record was returned). |
| | | // In that case, are counted the changes from |
| | | // the date of the most recent change from this last draft record |
| | | if (newestDate == 0L) |
| | | { |
| | | newestDate = changeNumberForLastSeqnum.getTime(); |
| | | } |
| | | |
| | | // And count changes of this domain from the date of the |
| | | // lastseqnum record (that does not refer to this domain) |
| | | ChangeNumber cnx = new ChangeNumber(newestDate, |
| | | changeNumberForLastSeqnum.getSeqnum(), 0); |
| | | ec = rsd.getEligibleCount(cnx, crossDomainEligibleCN); |
| | | |
| | | if (domainForLastSeqnum.equalsIgnoreCase(rsd.getBaseDn())) |
| | | ec--; |
| | | } |
| | | |
| | | // cumulates on domains |
| | | lastDraftCN += ec; |
| | | |
| | | // DraftCN Db is empty and there are eligible updates in the replication |
| | | // changelog then init first DraftCN |
| | | if ((ec>0) && (firstDraftCN==0)) |
| | | firstDraftCN = 1; |
| | | // Count changes of this domain from the beginning of the changelog |
| | | ChangeNumber trimCN = |
| | | new ChangeNumber(rsd.getLatestDomainTrimDate(), 0,0); |
| | | ec = rsd.getEligibleCount( |
| | | rsd.getStartState().duplicateOnlyOlderThan(trimCN), |
| | | crossDomainEligibleCN); |
| | | } |
| | | else |
| | | { |
| | | // There are records in the draftDB (so already returned to clients) |
| | | // BUT |
| | | // There is nothing related to this domain in the last draft record |
| | | // (may be this domain was disabled when this record was returned). |
| | | // In that case, are counted the changes from |
| | | // the date of the most recent change from this last draft record |
| | | if (newestDate == 0L) |
| | | { |
| | | newestDate = changeNumberForLastSeqnum.getTime(); |
| | | } |
| | | |
| | | // And count changes of this domain from the date of the |
| | | // lastseqnum record (that does not refer to this domain) |
| | | ChangeNumber cnx = new ChangeNumber(newestDate, |
| | | changeNumberForLastSeqnum.getSeqnum(), 0); |
| | | ec = rsd.getEligibleCount(cnx, crossDomainEligibleCN); |
| | | |
| | | if (domainForLastSeqnum.equalsIgnoreCase(rsd.getBaseDn())) |
| | | ec--; |
| | | } |
| | | |
| | | // cumulates on domains |
| | | lastDraftCN += ec; |
| | | |
| | | // DraftCN Db is empty and there are eligible updates in the replication |
| | | // changelog then init first DraftCN |
| | | if ((ec>0) && (firstDraftCN==0)) |
| | | firstDraftCN = 1; |
| | | } |
| | | if (dbEmpty) |
| | | { |
| | |
| | | * @return the last cookie value. |
| | | */ |
| | | public MultiDomainServerState getLastECLCookie( |
| | | ArrayList<String> excludedServiceIDs) |
| | | List<String> excludedServiceIDs) |
| | | { |
| | | disableEligibility(excludedServiceIDs); |
| | | |
| | | MultiDomainServerState result = new MultiDomainServerState(); |
| | | // Initialize start state for all running domains with empty state |
| | | Iterator<ReplicationServerDomain> rsdk = this.getDomainIterator(); |
| | | if (rsdk != null) |
| | | for (ReplicationServerDomain rsd : getReplicationServerDomains()) |
| | | { |
| | | while (rsdk.hasNext()) |
| | | { |
| | | // process a domain |
| | | ReplicationServerDomain rsd = rsdk.next(); |
| | | if ((excludedServiceIDs != null) |
| | | && (excludedServiceIDs.contains(rsd.getBaseDn()))) |
| | | continue; |
| | | |
| | | if ((excludedServiceIDs!=null) |
| | | && (excludedServiceIDs.contains(rsd.getBaseDn()))) |
| | | continue; |
| | | if (rsd.getDbServerState().isEmpty()) |
| | | continue; |
| | | |
| | | if (rsd.getDbServerState().isEmpty()) |
| | | continue; |
| | | |
| | | result.update(rsd.getBaseDn(), rsd.getEligibleState( |
| | | getEligibleCN())); |
| | | } |
| | | result.update(rsd.getBaseDn(), rsd.getEligibleState(getEligibleCN())); |
| | | } |
| | | return result; |
| | | } |