| | |
| | | import java.net.ServerSocket; |
| | | import java.net.Socket; |
| | | import java.net.UnknownHostException; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.Date; |
| | | import java.util.HashMap; |
| | | import java.util.HashSet; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.*; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.WorkflowImpl; |
| | | import org.opends.server.core.networkgroups.NetworkGroup; |
| | | import org.opends.server.loggers.LogLevel; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | import java.util.Collections; |
| | | |
| | | /** |
| | | * ReplicationServer Listener. |
| | |
| | | * It is responsible for creating the replication server replicationServerDomain |
| | | * and managing it |
| | | */ |
| | | public class ReplicationServer |
| | | public final class ReplicationServer |
| | | implements ConfigurationChangeListener<ReplicationServerCfg>, |
| | | BackupTaskListener, RestoreTaskListener, ImportTaskListener, |
| | | ExportTaskListener |
| | |
| | | /* This table is used to store the list of dn for which we are currently |
| | | * handling servers. |
| | | */ |
| | | private ConcurrentHashMap<String, ReplicationServerDomain> baseDNs = |
| | | new ConcurrentHashMap<String, ReplicationServerDomain>(); |
| | | private final Map<String, ReplicationServerDomain> baseDNs = |
| | | new HashMap<String, ReplicationServerDomain>(); |
| | | |
| | | private String localURL = "null"; |
| | | private volatile boolean shutdown = false; |
| | |
| | | // ID of the backend |
| | | private static final String backendId = "replicationChanges"; |
| | | |
| | | // At startup, the listen thread wait on this flag for the connect |
| | | // thread to look for other servers in the topology. |
| | | private boolean connectedInTopology = false; |
| | | private final Object connectedInTopologyLock = new Object(); |
| | | |
| | | /* |
| | | * Assured mode properties |
| | | */ |
| | |
| | | // The handler of the draft change numbers database, the database used to |
| | | // store the relation between a draft change number ('seqnum') and the |
| | | // associated cookie. |
| | | // |
| | | // Guarded by draftCNLock |
| | | // |
| | | private DraftCNDbHandler draftCNDbHandler; |
| | | |
| | | // The last value generated of the draft change number. |
| | | // |
| | | // Guarded by draftCNLock |
| | | // |
| | | private int lastGeneratedDraftCN = 0; |
| | | |
| | | // Used for protecting draft CN related state. |
| | | private final Object draftCNLock = new Object(); |
| | | |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | |
| | | |
| | | private static String externalChangeLogWorkflowID = |
| | | "External Changelog Workflow ID"; |
| | | ECLWorkflowElement eclwe; |
| | | WorkflowImpl externalChangeLogWorkflowImpl = null; |
| | | private ECLWorkflowElement eclwe; |
| | | private WorkflowImpl externalChangeLogWorkflowImpl = null; |
| | | |
| | | private static HashSet<Integer> localPorts = new HashSet<Integer>(); |
| | | |
| | | // used to synchronize the domain creation with the connect thread. |
| | | final private Object domainMonitor = new Object(); |
| | | // Monitors for synchronizing domain creation with the connect thread. |
| | | private final Object domainTicketLock = new Object(); |
| | | private final Object connectThreadLock = new Object(); |
| | | private long domainTicket = 0L; |
| | | |
| | | // ServiceIDs excluded for ECL |
| | | private ArrayList<String> excludedServiceIDs = new ArrayList<String>(); |
| | |
| | | |
| | | void runListen() |
| | | { |
| | | // wait for the connect thread to find other replication |
| | | // servers in the topology before starting to accept connections |
| | | // from the ldap servers. |
| | | synchronized (connectedInTopologyLock) |
| | | { |
| | | if (connectedInTopology == false) |
| | | { |
| | | try |
| | | { |
| | | connectedInTopologyLock.wait(1000); |
| | | } catch (InterruptedException e) |
| | | { |
| | | } |
| | | } |
| | | } |
| | | |
| | | while ((shutdown == false) && (stopListen == false)) |
| | | { |
| | | // Wait on the replicationServer port. |
| | |
| | | */ |
| | | void runConnect() |
| | | { |
| | | while (shutdown == false) |
| | | synchronized (connectThreadLock) |
| | | { |
| | | /* |
| | | * periodically check that we are connected to all other |
| | | * replication servers and if not establish the connection |
| | | */ |
| | | for (ReplicationServerDomain replicationServerDomain: baseDNs.values()) |
| | | while (!shutdown) |
| | | { |
| | | Set<String> connectedReplServers = |
| | | replicationServerDomain.getChangelogs(); |
| | | /* |
| | | * check that all replication server in the config are in the connected |
| | | * Set. If not create the connection |
| | | * periodically check that we are connected to all other replication |
| | | * servers and if not establish the connection |
| | | */ |
| | | for (String serverURL : replicationServers) |
| | | for (ReplicationServerDomain domain : getReplicationServerDomains()) |
| | | { |
| | | int separator = serverURL.lastIndexOf(':'); |
| | | String port = serverURL.substring(separator + 1); |
| | | String hostname = serverURL.substring(0, separator); |
| | | Set<String> connectedReplServers = domain.getChangelogs(); |
| | | |
| | | try |
| | | /* |
| | | * check that all replication server in the config are in the |
| | | * connected Set. If not create the connection |
| | | */ |
| | | for (String serverURL : replicationServers) |
| | | { |
| | | InetAddress inetAddress = InetAddress.getByName(hostname); |
| | | String serverAddress = inetAddress.getHostAddress() + ":" + port; |
| | | String alternServerAddress = null; |
| | | if (hostname.equalsIgnoreCase("localhost")) |
| | | int separator = serverURL.lastIndexOf(':'); |
| | | String port = serverURL.substring(separator + 1); |
| | | String hostname = serverURL.substring(0, separator); |
| | | |
| | | try |
| | | { |
| | | // if "localhost" was used as the hostname in the configuration |
| | | // also check is the connection is already opened with the |
| | | // local address. |
| | | alternServerAddress = |
| | | InetAddress.getLocalHost().getHostAddress() + ":" + port; |
| | | } |
| | | if (inetAddress.equals(InetAddress.getLocalHost())) |
| | | { |
| | | // if the host address is the local one, also check |
| | | // if the connection is already opened with the "localhost" |
| | | // address |
| | | alternServerAddress = "127.0.0.1" + ":" + port; |
| | | } |
| | | InetAddress inetAddress = InetAddress |
| | | .getByName(hostname); |
| | | String serverAddress = inetAddress.getHostAddress() |
| | | + ":" + port; |
| | | String alternServerAddress = null; |
| | | |
| | | if ((serverAddress.compareTo("127.0.0.1:" + replicationPort) != 0) |
| | | && (serverAddress.compareTo(this.localURL) != 0) |
| | | && (!connectedReplServers.contains(serverAddress) |
| | | && ((alternServerAddress == null) |
| | | || !connectedReplServers.contains(alternServerAddress)))) |
| | | if (hostname.equalsIgnoreCase("localhost")) |
| | | { |
| | | // if "localhost" was used as the hostname in the configuration |
| | | // also check is the connection is already opened with the |
| | | // local address. |
| | | alternServerAddress = InetAddress.getLocalHost() |
| | | .getHostAddress() + ":" + port; |
| | | } |
| | | |
| | | if (inetAddress.equals(InetAddress.getLocalHost())) |
| | | { |
| | | // if the host address is the local one, also check |
| | | // if the connection is already opened with the "localhost" |
| | | // address |
| | | alternServerAddress = "127.0.0.1" + ":" + port; |
| | | } |
| | | |
| | | if ((serverAddress.compareTo("127.0.0.1:" |
| | | + replicationPort) != 0) |
| | | && (serverAddress.compareTo(this.localURL) != 0) |
| | | && (!connectedReplServers.contains(serverAddress) |
| | | && ((alternServerAddress == null) || !connectedReplServers |
| | | .contains(alternServerAddress)))) |
| | | { |
| | | connect(serverURL, domain.getBaseDn()); |
| | | } |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | this.connect(serverURL, replicationServerDomain.getBaseDn()); |
| | | Message message = ERR_COULD_NOT_SOLVE_HOSTNAME |
| | | .get(hostname); |
| | | logError(message); |
| | | } |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | Message message = ERR_COULD_NOT_SOLVE_HOSTNAME.get(hostname); |
| | | logError(message); |
| | | } |
| | | } |
| | | } |
| | | synchronized (connectedInTopologyLock) |
| | | { |
| | | // wake up the listen thread if necessary. |
| | | if (connectedInTopology == false) |
| | | |
| | | // Notify any threads waiting with domain tickets after each iteration. |
| | | synchronized (domainTicketLock) |
| | | { |
| | | connectedInTopologyLock.notify(); |
| | | connectedInTopology = true; |
| | | domainTicket++; |
| | | domainTicketLock.notifyAll(); |
| | | } |
| | | } |
| | | try |
| | | { |
| | | synchronized(domainMonitor) |
| | | |
| | | // Retry each second. |
| | | final int randomizer = (int) (Math.random() * 100); |
| | | try |
| | | { |
| | | domainMonitor.notifyAll(); |
| | | // Releases lock, allows threads to get domain ticket. |
| | | connectThreadLock.wait(1000 + randomizer); |
| | | } |
| | | synchronized (this) |
| | | catch (InterruptedException e) |
| | | { |
| | | /* check if we are connected every second */ |
| | | int randomizer = (int)(Math.random()*100); |
| | | wait(1000 + randomizer); |
| | | // Signalled to shutdown. |
| | | return; |
| | | } |
| | | } catch (InterruptedException e) |
| | | { |
| | | // ignore error, will try to connect again or shutdown |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | private void shutdownECL() |
| | | { |
| | | WorkflowImpl eclwf = |
| | | (WorkflowImpl)WorkflowImpl.getWorkflow(externalChangeLogWorkflowID); |
| | | WorkflowImpl eclwf = (WorkflowImpl) WorkflowImpl |
| | | .getWorkflow(externalChangeLogWorkflowID); |
| | | |
| | | // do it only if not already done by another RS (unit test case) |
| | | // if (DirectoryServer.getWorkflowElement(externalChangeLogWorkflowID) |
| | | if (eclwf!=null) |
| | | if (eclwf != null) |
| | | { |
| | | // FIXME:ECL should the ECL Workflow be registered in |
| | | // internalNetworkGroup? |
| | | NetworkGroup internalNetworkGroup = NetworkGroup |
| | | .getInternalNetworkGroup(); |
| | | internalNetworkGroup |
| | | .deregisterWorkflow(externalChangeLogWorkflowID); |
| | | |
| | | // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup? |
| | | NetworkGroup adminNetworkGroup = NetworkGroup |
| | | .getAdminNetworkGroup(); |
| | | adminNetworkGroup |
| | | .deregisterWorkflow(externalChangeLogWorkflowID); |
| | | |
| | | // FIXME:ECL should the ECL Workflow be registered in internalNetworkGroup? |
| | | NetworkGroup internalNetworkGroup = NetworkGroup.getInternalNetworkGroup(); |
| | | internalNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID); |
| | | NetworkGroup defaultNetworkGroup = NetworkGroup |
| | | .getDefaultNetworkGroup(); |
| | | defaultNetworkGroup |
| | | .deregisterWorkflow(externalChangeLogWorkflowID); |
| | | |
| | | // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup? |
| | | NetworkGroup adminNetworkGroup = NetworkGroup.getAdminNetworkGroup(); |
| | | adminNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID); |
| | | |
| | | NetworkGroup defaultNetworkGroup = NetworkGroup.getDefaultNetworkGroup(); |
| | | defaultNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID); |
| | | |
| | | eclwf.deregister(); |
| | | eclwf.finalizeWorkflow(); |
| | | eclwf.deregister(); |
| | | eclwf.finalizeWorkflow(); |
| | | } |
| | | |
| | | eclwe = (ECLWorkflowElement) |
| | | DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG"); |
| | | if (eclwe!=null) |
| | | eclwe = (ECLWorkflowElement) DirectoryServer |
| | | .getWorkflowElement("EXTERNAL CHANGE LOG"); |
| | | if (eclwe != null) |
| | | { |
| | | DirectoryServer.deregisterWorkflowElement(eclwe); |
| | | eclwe.finalizeWorkflowElement(); |
| | | } |
| | | |
| | | if (draftCNDbHandler != null) |
| | | draftCNDbHandler.shutdown(); |
| | | synchronized (draftCNLock) |
| | | { |
| | | if (draftCNDbHandler != null) |
| | | { |
| | | draftCNDbHandler.shutdown(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | public ReplicationServerDomain getReplicationServerDomain(String baseDn, |
| | | boolean create, boolean waitConnections) |
| | | { |
| | | ReplicationServerDomain replicationServerDomain; |
| | | ReplicationServerDomain domain; |
| | | |
| | | synchronized (baseDNs) |
| | | { |
| | | replicationServerDomain = baseDNs.get(baseDn); |
| | | if ((replicationServerDomain == null) && (create)) |
| | | domain = baseDNs.get(baseDn); |
| | | |
| | | if (domain != null ||!create) { |
| | | return domain; |
| | | } |
| | | |
| | | domain = new ReplicationServerDomain(baseDn, this); |
| | | baseDNs.put(baseDn, domain); |
| | | } |
| | | |
| | | if (waitConnections) |
| | | { |
| | | // Acquire a domain ticket and wait for a complete cycle of the connect |
| | | // thread. |
| | | final long myDomainTicket; |
| | | synchronized (connectThreadLock) |
| | | { |
| | | replicationServerDomain = new ReplicationServerDomain(baseDn, this); |
| | | baseDNs.put(baseDn, replicationServerDomain); |
| | | synchronized (domainMonitor) |
| | | // Connect thread must be waiting. |
| | | synchronized (domainTicketLock) |
| | | { |
| | | if (waitConnections) |
| | | // Determine the ticket which will be used in the next connect thread |
| | | // iteration. |
| | | myDomainTicket = domainTicket + 1; |
| | | } |
| | | |
| | | // Wake up connect thread. |
| | | connectThreadLock.notify(); |
| | | } |
| | | |
| | | // Wait until the connect thread has processed next connect phase. |
| | | synchronized (domainTicketLock) |
| | | { |
| | | // Condition. |
| | | while (myDomainTicket > domainTicket && !shutdown) |
| | | { |
| | | try |
| | | { |
| | | synchronized (this) |
| | | { |
| | | // kick up the connect thread so that this new domain |
| | | // gets connected to all the Replication Servers. |
| | | this.notify(); |
| | | } |
| | | try |
| | | { |
| | | // wait for the connect thread to signal that it finished its job |
| | | domainMonitor.wait(500); |
| | | } catch (InterruptedException e) |
| | | { |
| | | } |
| | | // Wait with timeout so that we detect shutdown. |
| | | domainTicketLock.wait(500); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // Can't do anything with this. |
| | | Thread.currentThread().interrupt(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | return replicationServerDomain; |
| | | return domain; |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | // shutdown all the ChangelogCaches |
| | | for (ReplicationServerDomain replicationServerDomain : baseDNs.values()) |
| | | for (ReplicationServerDomain domain : getReplicationServerDomains()) |
| | | { |
| | | replicationServerDomain.shutdown(); |
| | | domain.shutdown(); |
| | | } |
| | | |
| | | shutdownECL(); |
| | |
| | | return new DbHandler(id, baseDn, this, dbEnv, queueSize); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Clears the generationId for the replicationServerDomain related to the |
| | | * provided baseDn. |
| | | * @param baseDn The baseDn for which to delete the generationId. |
| | | * @throws DatabaseException When it occurs. |
| | | * |
| | | * @param baseDn |
| | | * The baseDn for which to delete the generationId. |
| | | */ |
| | | public void clearGenerationId(String baseDn) |
| | | throws DatabaseException |
| | | { |
| | | try |
| | | { |
| | | dbEnv.clearGenerationId(baseDn); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // Ignore. |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.WARNING, e); |
| | | } |
| | | } |
| | | |
| | | if (this.draftCNDbHandler != null) |
| | | synchronized (draftCNLock) |
| | | { |
| | | if (draftCNDbHandler != null) |
| | | { |
| | | try |
| | | { |
| | | try |
| | | draftCNDbHandler.clear(baseDn); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // Ignore. |
| | | if (debugEnabled()) |
| | | { |
| | | draftCNDbHandler.clear(baseDn); |
| | | TRACER.debugCaught(DebugLogLevel.WARNING, e); |
| | | } |
| | | catch(Exception e){} |
| | | } |
| | | |
| | | try |
| | | { |
| | | lastGeneratedDraftCN = draftCNDbHandler.getLastKey(); |
| | | } |
| | | catch(Exception e) {} |
| | | catch (Exception e) |
| | | { |
| | | // Ignore. |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.WARNING, e); |
| | | } |
| | | } |
| | | } |
| | | |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(LogLevel.ALL, e); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | purgeDelay = newPurgeDelay; |
| | | // propagate |
| | | for (ReplicationServerDomain domain : baseDNs.values()) |
| | | for (ReplicationServerDomain domain : getReplicationServerDomains()) |
| | | { |
| | | domain.setPurgeDelay(purgeDelay*1000); |
| | | } |
| | |
| | | |
| | | // Update threshold value for status analyzers (stop them if requested |
| | | // value is 0) |
| | | if (degradedStatusThreshold != configuration.getDegradedStatusThreshold()) |
| | | if (degradedStatusThreshold != configuration |
| | | .getDegradedStatusThreshold()) |
| | | { |
| | | int oldThresholdValue = degradedStatusThreshold; |
| | | degradedStatusThreshold = configuration.getDegradedStatusThreshold(); |
| | | for(ReplicationServerDomain rsd : baseDNs.values()) |
| | | degradedStatusThreshold = configuration |
| | | .getDegradedStatusThreshold(); |
| | | for (ReplicationServerDomain domain : getReplicationServerDomains()) |
| | | { |
| | | if (degradedStatusThreshold == 0) |
| | | { |
| | | // Requested to stop analyzers |
| | | rsd.stopStatusAnalyzer(); |
| | | } else if (rsd.isRunningStatusAnalyzer()) |
| | | domain.stopStatusAnalyzer(); |
| | | } |
| | | else if (domain.isRunningStatusAnalyzer()) |
| | | { |
| | | // Update the threshold value for this running analyzer |
| | | rsd.updateStatusAnalyzer(degradedStatusThreshold); |
| | | } else if (oldThresholdValue == 0) |
| | | domain.updateStatusAnalyzer(degradedStatusThreshold); |
| | | } |
| | | else if (oldThresholdValue == 0) |
| | | { |
| | | // Requested to start analyzers with provided threshold value |
| | | if (rsd.getConnectedDSs().size() > 0) |
| | | rsd.startStatusAnalyzer(); |
| | | if (domain.getConnectedDSs().size() > 0) |
| | | domain.startStatusAnalyzer(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Update period value for monitoring publishers (stop them if requested |
| | | // value is 0) |
| | | if (monitoringPublisherPeriod != configuration.getMonitoringPeriod()) |
| | | if (monitoringPublisherPeriod != configuration |
| | | .getMonitoringPeriod()) |
| | | { |
| | | long oldMonitoringPeriod = monitoringPublisherPeriod; |
| | | monitoringPublisherPeriod = configuration.getMonitoringPeriod(); |
| | | for(ReplicationServerDomain rsd : baseDNs.values()) |
| | | for (ReplicationServerDomain domain : getReplicationServerDomains()) |
| | | { |
| | | if (monitoringPublisherPeriod == 0L) |
| | | { |
| | | // Requested to stop monitoring publishers |
| | | rsd.stopMonitoringPublisher(); |
| | | } else if (rsd.isRunningMonitoringPublisher()) |
| | | domain.stopMonitoringPublisher(); |
| | | } |
| | | else if (domain.isRunningMonitoringPublisher()) |
| | | { |
| | | // Update the threshold value for this running monitoring publisher |
| | | rsd.updateMonitoringPublisher(monitoringPublisherPeriod); |
| | | } else if (oldMonitoringPeriod == 0L) |
| | | domain.updateMonitoringPublisher(monitoringPublisherPeriod); |
| | | } |
| | | else if (oldMonitoringPeriod == 0L) |
| | | { |
| | | // Requested to start monitoring publishers with provided period value |
| | | if ( (rsd.getConnectedDSs().size() > 0) || |
| | | (rsd.getConnectedRSs().size() > 0) ) |
| | | rsd.startMonitoringPublisher(); |
| | | if ((domain.getConnectedDSs().size() > 0) |
| | | || (domain.getConnectedRSs().size() > 0)) |
| | | domain.startMonitoringPublisher(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Changed the group id ? |
| | | byte newGroupId = (byte)configuration.getGroupId(); |
| | | byte newGroupId = (byte) configuration.getGroupId(); |
| | | if (newGroupId != groupId) |
| | | { |
| | | groupId = newGroupId; |
| | | // Have a new group id: Disconnect every servers. |
| | | for (ReplicationServerDomain replicationServerDomain : baseDNs.values()) |
| | | for (ReplicationServerDomain domain : getReplicationServerDomains()) |
| | | { |
| | | replicationServerDomain.stopAllServers(true); |
| | | domain.stopAllServers(true); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | private void broadcastConfigChange() |
| | | { |
| | | for (ReplicationServerDomain replicationServerDomain : baseDNs.values()) |
| | | for (ReplicationServerDomain domain : getReplicationServerDomains()) |
| | | { |
| | | replicationServerDomain.buildAndSendTopoInfoToDSs(null); |
| | | replicationServerDomain.buildAndSendTopoInfoToRSs(); |
| | | domain.buildAndSendTopoInfoToDSs(null); |
| | | domain.buildAndSendTopoInfoToRSs(); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public Iterator<ReplicationServerDomain> getDomainIterator() |
| | | { |
| | | if (!baseDNs.isEmpty()) |
| | | return baseDNs.values().iterator(); |
| | | Collection<ReplicationServerDomain> domains = getReplicationServerDomains(); |
| | | if (!domains.isEmpty()) |
| | | { |
| | | return domains.iterator(); |
| | | } |
| | | else |
| | | { |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | rsd.clearDbs(); |
| | | } |
| | | } |
| | | if (this.draftCNDbHandler != null) |
| | | |
| | | synchronized (draftCNLock) |
| | | { |
| | | try |
| | | if (draftCNDbHandler != null) |
| | | { |
| | | try { draftCNDbHandler.clear(); } catch(Exception e){} |
| | | draftCNDbHandler.shutdown(); |
| | | try |
| | | { |
| | | draftCNDbHandler.clear(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // Ignore. |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.WARNING, e); |
| | | } |
| | | } |
| | | |
| | | try |
| | | { |
| | | draftCNDbHandler.shutdown(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // Ignore. |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.WARNING, e); |
| | | } |
| | | } |
| | | |
| | | lastGeneratedDraftCN = 0; |
| | | draftCNDbHandler = null; |
| | | } |
| | | catch(Exception e) {} |
| | | } |
| | | } |
| | | |
| | |
| | | if (serversToDisconnect.isEmpty()) |
| | | return; |
| | | |
| | | for (ReplicationServerDomain replicationServerDomain: baseDNs.values()) |
| | | for (ReplicationServerDomain domain: getReplicationServerDomains()) |
| | | { |
| | | replicationServerDomain.stopReplicationServers(serversToDisconnect); |
| | | domain.stopReplicationServers(serversToDisconnect); |
| | | } |
| | | } |
| | | |
| | |
| | | return replicationPort; |
| | | } |
| | | |
| | | // TODO: Remote monitor data cache lifetime is 500ms/should be configurable |
| | | private long monitorDataLifeTime = 500; |
| | | |
| | | /* The date of the last time they have been elaborated */ |
| | | private long monitorDataLastBuildDate = 0; |
| | | |
| | | /** |
| | | * This uniquely identifies a server (handler) in the cross-domain topology. |
| | | * Represents an identifier of a handler (in the whole RS) we have to wait a |
| | | * monitoring message from before answering to a monitor request. |
| | | */ |
| | | public static class GlobalServerId { |
| | | |
| | | private int serverId = -1; |
| | | private String baseDn = null; |
| | | |
| | | /** |
| | | * Constructor for a global server id. |
| | | * @param baseDn The dn of the RSD owning the handler. |
| | | * @param serverId The handler id in the matching RSD. |
| | | */ |
| | | public GlobalServerId(String baseDn, int serverId) { |
| | | this.baseDn = baseDn; |
| | | this.serverId = serverId; |
| | | } |
| | | |
| | | /** |
| | | * Get the server handler id. |
| | | * @return the serverId |
| | | */ |
| | | public int getServerId() |
| | | { |
| | | return serverId; |
| | | } |
| | | |
| | | /** |
| | | * Get the base dn. |
| | | * @return the baseDn |
| | | */ |
| | | public String getBaseDn() |
| | | { |
| | | return baseDn; |
| | | } |
| | | |
| | | /** |
| | | * Get the hascode. |
| | | * @return The hashcode. |
| | | */ |
| | | @Override |
| | | public int hashCode() |
| | | { |
| | | int hash = 7; |
| | | hash = 43 * hash + this.serverId; |
| | | hash = 43 * hash + (this.baseDn != null ? this.baseDn.hashCode() : 0); |
| | | return hash; |
| | | } |
| | | |
| | | /** |
| | | * Tests if the passed global server handler id represents the same server |
| | | * handler as this one. |
| | | * @param obj The object to test. |
| | | * @return True if both identifiers are the same. |
| | | */ |
| | | public boolean equals(Object obj) { |
| | | if ( (obj == null) || (!(obj instanceof GlobalServerId))) |
| | | return false; |
| | | |
| | | GlobalServerId globalServerId = (GlobalServerId)obj; |
| | | return ( globalServerId.baseDn.equals(baseDn) && |
| | | (globalServerId.serverId == serverId) ); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This gives the list of server handlers we are willing to wait monitoring |
| | | * message from. Each time a monitoring message is received by a server |
| | | * handler, the matching server handler id is retired from the list. When the |
| | | * list is empty, we received all expected monitoring messages. |
| | | */ |
| | | private List<GlobalServerId> expectedMonitoringMsg = null; |
| | | |
| | | /** |
| | | * Trigger the computation of the Global Monitoring Data. |
| | | * This should be called by all the MonitorProviders that need |
| | | * the global monitoring data to be updated before they can |
| | | * publish their information to cn=monitor. |
| | | * |
| | | * This method will trigger the update of all the global monitoring |
| | | * information of all the base-DNs of this replication Server. |
| | | * |
| | | * @throws DirectoryException If the computation cannot be achieved. |
| | | */ |
| | | public synchronized void computeMonitorData() throws DirectoryException |
| | | { |
| | | if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime()) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + getMonitorInstanceName() + " getRemoteMonitorData in cache"); |
| | | // The current data are still valid. No need to renew them. |
| | | return; |
| | | } |
| | | |
| | | // Initialize the list of server handlers we expect monitoring messages from |
| | | expectedMonitoringMsg = |
| | | Collections.synchronizedList(new ArrayList<GlobalServerId>()); |
| | | |
| | | // Copy the list of domains as a new domain may arrive or disappear between |
| | | // the initializeMonitorData and completeMonitorData calls |
| | | List<ReplicationServerDomain> rsdList = |
| | | new ArrayList<ReplicationServerDomain>(baseDNs.values()); |
| | | |
| | | for (ReplicationServerDomain domain : rsdList) |
| | | { |
| | | domain.initializeMonitorData(expectedMonitoringMsg); |
| | | } |
| | | |
| | | // Wait for responses |
| | | waitMonitorDataResponses(); |
| | | |
| | | for (ReplicationServerDomain domain : rsdList) |
| | | { |
| | | domain.completeMonitorData(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Wait for the expected received MonitorMsg. |
| | | * @throws DirectoryException When an error occurs. |
| | | */ |
| | | private void waitMonitorDataResponses() |
| | | throws DirectoryException |
| | | { |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + getMonitorInstanceName() + |
| | | " waiting for " + expectedMonitoringMsg.size() + |
| | | " expected monitor messages"); |
| | | |
| | | // Wait up to 5 seconds for every expected monitoring message to come |
| | | // back. |
| | | boolean allReceived = false; |
| | | long startTime = TimeThread.getTime(); |
| | | long curTime = startTime; |
| | | int maxTime = 5000; |
| | | while ( (curTime - startTime) < maxTime ) |
| | | { |
| | | // Have every expected monitoring messages arrived ? |
| | | if (expectedMonitoringMsg.size() == 0) |
| | | { |
| | | // Ok break the loop |
| | | allReceived = true; |
| | | break; |
| | | } |
| | | Thread.sleep(100); |
| | | curTime = TimeThread.getTime(); |
| | | } |
| | | |
| | | monitorDataLastBuildDate = TimeThread.getTime(); |
| | | |
| | | if (!allReceived) |
| | | { |
| | | logError(ERR_MISSING_REMOTE_MONITOR_DATA.get()); |
| | | // let's go on in best effort even with limited data received. |
| | | } else |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + getMonitorInstanceName() + |
| | | " Successfully received all expected monitor messages"); |
| | | } |
| | | } catch (Exception e) |
| | | { |
| | | logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage())); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * This should be called by each ReplicationServerDomain that receives |
| | | * a response to a monitor request message. This may also be called when a |
| | | * monitoring message is coming from a RS whose monitoring publisher thread |
| | | * sent it. As monitoring messages (sent because of monitoring request or |
| | | * because of monitoring publisher) have the same content, this is also ok |
| | | * to mark ok the server when the monitoring message coms from a monitoring |
| | | * publisher thread. |
| | | * @param globalServerId The server handler that is receiving the |
| | | * monitoring message. |
| | | */ |
| | | public void responseReceived(GlobalServerId globalServerId) |
| | | { |
| | | expectedMonitoringMsg.remove(globalServerId); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * This should be called when the Monitoring has failed and the |
| | | * Worker thread that is waiting for the result should be awaken. |
| | | */ |
| | | public void responseReceivedAll() |
| | | { |
| | | expectedMonitoringMsg.clear(); |
| | | } |
| | | |
| | | /** |
| | | * Returns the number of domains managed by this replication server. |
| | | * @return the number of domains managed. |
| | | */ |
| | | public int getCacheSize() |
| | | { |
| | | return baseDNs.size(); |
| | | } |
| | | |
| | | /** |
| | | * Create a new session to get the ECL. |
| | | * @param msg The message that specifies the ECL request. |
| | |
| | | return eligibleCN; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Get or create a handler on a Db on DraftCN for external changelog. |
| | | * |
| | | * @return the handler. |
| | | * @throws DirectoryException when needed. |
| | | * @throws DirectoryException |
| | | * when needed. |
| | | */ |
| | | public synchronized DraftCNDbHandler getDraftCNDbHandler() |
| | | throws DirectoryException |
| | | public DraftCNDbHandler getDraftCNDbHandler() |
| | | throws DirectoryException |
| | | { |
| | | try |
| | | synchronized (draftCNLock) |
| | | { |
| | | if (draftCNDbHandler == null) |
| | | try |
| | | { |
| | | draftCNDbHandler = new DraftCNDbHandler(this, this.dbEnv); |
| | | if (draftCNDbHandler == null) |
| | | return null; |
| | | this.lastGeneratedDraftCN = getLastDraftChangeNumber(); |
| | | { |
| | | draftCNDbHandler = new DraftCNDbHandler(this, this.dbEnv); |
| | | lastGeneratedDraftCN = getLastDraftChangeNumber(); |
| | | } |
| | | return draftCNDbHandler; |
| | | } |
| | | return draftCNDbHandler; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_DRAFT_CHANGENUMBER_DATABASE.get("")); |
| | | throw new DirectoryException(ResultCode.OPERATIONS_ERROR, |
| | | mb.toMessage(), e); |
| | | catch (Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_DRAFT_CHANGENUMBER_DATABASE.get("")); |
| | | throw new DirectoryException(ResultCode.OPERATIONS_ERROR, |
| | | mb.toMessage(), e); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public int getFirstDraftChangeNumber() |
| | | { |
| | | int first=0; |
| | | if (draftCNDbHandler != null) |
| | | first = draftCNDbHandler.getFirstKey(); |
| | | return first; |
| | | synchronized (draftCNLock) |
| | | { |
| | | if (draftCNDbHandler != null) |
| | | { |
| | | return draftCNDbHandler.getFirstKey(); |
| | | } |
| | | else |
| | | { |
| | | return 0; |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public int getLastDraftChangeNumber() |
| | | { |
| | | int last=0; |
| | | if (draftCNDbHandler != null) |
| | | last = draftCNDbHandler.getLastKey(); |
| | | return last; |
| | | synchronized (draftCNLock) |
| | | { |
| | | if (draftCNDbHandler != null) |
| | | { |
| | | return draftCNDbHandler.getLastKey(); |
| | | } |
| | | else |
| | | { |
| | | return 0; |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Generate a new Draft ChangeNumber. |
| | | * @return The generated Draft ChangeNUmber |
| | | */ |
| | | synchronized public int getNewDraftCN() |
| | | public int getNewDraftCN() |
| | | { |
| | | return ++lastGeneratedDraftCN; |
| | | synchronized (draftCNLock) |
| | | { |
| | | return ++lastGeneratedDraftCN; |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | return weight; |
| | | } |
| | | |
| | | |
| | | |
| | | private Collection<ReplicationServerDomain> getReplicationServerDomains() |
| | | { |
| | | synchronized (baseDNs) |
| | | { |
| | | return new ArrayList<ReplicationServerDomain>(baseDNs.values()); |
| | | } |
| | | } |
| | | |
| | | } |