| | |
| | | * <p> |
| | | * Full Initialization of a replica can be triggered by LDAP clients |
| | | * by creating InitializeTasks or InitializeTargetTask. |
| | | * Full initialization can also by triggered from the ReplicationDomain |
| | | * implementation using methods {@link #initializeRemote(int)} |
| | | * or {@link #initializeFromRemote(int)}. |
| | | * Full initialization can also be triggered from the ReplicationDomain |
| | | * implementation using methods {@link #initializeRemote(int, Task)} |
| | | * or {@link #initializeFromRemote(int, Task)}. |
| | | * <p> |
| | | * At shutdown time, the {@link #disableService()} method should be called to |
| | | * cleanly stop the replication service. |
| | | */ |
| | | public abstract class ReplicationDomain |
| | | { |
| | | |
| | | /** |
| | | * Contains all the attributes included for the ECL (External Changelog). |
| | | */ |
| | | // @Immutable |
| | | private final static class ECLIncludes |
| | | { |
| | | |
| | | final Map<Integer, Set<String>> includedAttrsByServer; |
| | | final Set<String> includedAttrsAllServers; |
| | | |
| | | final Map<Integer, Set<String>> includedAttrsForDeletesByServer; |
| | | final Set<String> includedAttrsForDeletesAllServers; |
| | | |
| | | private ECLIncludes( |
| | | Map<Integer, Set<String>> includedAttrsByServer, |
| | | Set<String> includedAttrsAllServers, |
| | | Map<Integer, Set<String>> includedAttrsForDeletesByServer, |
| | | Set<String> includedAttrsForDeletesAllServers) |
| | | { |
| | | this.includedAttrsByServer = includedAttrsByServer; |
| | | this.includedAttrsAllServers = includedAttrsAllServers; |
| | | |
| | | this.includedAttrsForDeletesByServer = includedAttrsForDeletesByServer; |
| | | this.includedAttrsForDeletesAllServers =includedAttrsForDeletesAllServers; |
| | | } |
| | | |
| | | @SuppressWarnings("unchecked") |
| | | public ECLIncludes() |
| | | { |
| | | this(Collections.EMPTY_MAP, Collections.EMPTY_SET, Collections.EMPTY_MAP, |
| | | Collections.EMPTY_SET); |
| | | } |
| | | |
| | | /** |
| | | * Add attributes to be included in the ECL. |
| | | * |
| | | * @param serverId |
| | | * Server where these attributes are configured. |
| | | * @param includeAttributes |
| | | * Attributes to be included with all change records, may include |
| | | * wild-cards. |
| | | * @param includeAttributesForDeletes |
| | | * Additional attributes to be included with delete change records, |
| | | * may include wild-cards. |
| | | * @return a new {@link ECLIncludes} object if included attributes have |
| | | * changed, or the current object otherwise. |
| | | */ |
| | | public ECLIncludes addIncludedAttributes(int serverId, |
| | | Set<String> includeAttributes, Set<String> includeAttributesForDeletes) |
| | | { |
| | | boolean configurationChanged = false; |
| | | |
| | | Set<String> s1 = new HashSet<String>(includeAttributes); |
| | | |
| | | // Combine all+delete attributes. |
| | | Set<String> s2 = new HashSet<String>(s1); |
| | | s2.addAll(includeAttributesForDeletes); |
| | | |
| | | Map<Integer,Set<String>> eclIncludesByServer = this.includedAttrsByServer; |
| | | if (!s1.equals(this.includedAttrsByServer.get(serverId))) |
| | | { |
| | | configurationChanged = true; |
| | | eclIncludesByServer = new HashMap<Integer, Set<String>>( |
| | | this.includedAttrsByServer); |
| | | eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1)); |
| | | } |
| | | |
| | | Map<Integer, Set<String>> eclIncludesForDeletesByServer = |
| | | this.includedAttrsForDeletesByServer; |
| | | if (!s2.equals(this.includedAttrsForDeletesByServer.get(serverId))) |
| | | { |
| | | configurationChanged = true; |
| | | eclIncludesForDeletesByServer = new HashMap<Integer, Set<String>>( |
| | | this.includedAttrsForDeletesByServer); |
| | | eclIncludesForDeletesByServer.put( |
| | | serverId, Collections.unmodifiableSet(s2)); |
| | | } |
| | | |
| | | if (!configurationChanged) |
| | | { |
| | | return this; |
| | | } |
| | | |
| | | // and rebuild the global list to be ready for usage |
| | | Set<String> eclIncludesAllServer = new HashSet<String>(); |
| | | for (Set<String> attributes : eclIncludesByServer.values()) |
| | | { |
| | | eclIncludesAllServer.addAll(attributes); |
| | | } |
| | | |
| | | Set<String> eclIncludesForDeletesAllServer = new HashSet<String>(); |
| | | for (Set<String> attributes : eclIncludesForDeletesByServer.values()) |
| | | { |
| | | eclIncludesForDeletesAllServer.addAll(attributes); |
| | | } |
| | | return new ECLIncludes(eclIncludesByServer, |
| | | Collections.unmodifiableSet(eclIncludesAllServer), |
| | | eclIncludesForDeletesByServer, |
| | | Collections.unmodifiableSet(eclIncludesForDeletesAllServer)); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Current status for this replicated domain. |
| | | */ |
| | |
| | | */ |
| | | private final CSNGenerator generator; |
| | | |
| | | private final Object eclIncludesLock = new Object(); |
| | | private final Map<Integer, Set<String>> eclIncludesByServer = |
| | | new HashMap<Integer, Set<String>>(); |
| | | private Set<String> eclIncludesAllServers = Collections.emptySet(); |
| | | |
| | | private final Map<Integer, Set<String>> eclIncludesForDeletesByServer = |
| | | new HashMap<Integer, Set<String>>(); |
| | | private Set<String> eclIncludesForDeletesAllServers = Collections.emptySet(); |
| | | private final AtomicReference<ECLIncludes> eclIncludes = |
| | | new AtomicReference<ECLIncludes>(new ECLIncludes()); |
| | | |
| | | /** |
| | | * An object used to protect the initialization of the underlying broker |
| | |
| | | * Gets the info for Replicas in the topology (except us). |
| | | * @return The info for Replicas in the topology (except us) |
| | | */ |
| | | public List<DSInfo> getReplicasList() |
| | | public Map<Integer, DSInfo> getReplicaInfos() |
| | | { |
| | | return broker.getDsList(); |
| | | return broker.getReplicaInfos(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * disconnected. Return null when no server with the provided serverId is |
| | | * connected. |
| | | * |
| | | * @param serverId The provided serverId of the remote replica |
| | | * @param dsId The provided serverId of the remote replica |
| | | * @return the info related to this remote server if it is connected, |
| | | * null is the server is NOT connected. |
| | | */ |
| | | public DSInfo isRemoteDSConnected(int serverId) |
| | | private DSInfo isRemoteDSConnected(int dsId) |
| | | { |
| | | for (DSInfo remoteDS : getReplicasList()) |
| | | { |
| | | if (remoteDS.getDsId() == serverId) |
| | | { |
| | | return remoteDS; |
| | | } |
| | | } |
| | | return null; |
| | | return getReplicaInfos().get(dsId); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @return The info for RSs in the topology (except the one we are connected |
| | | * to) |
| | | */ |
| | | public List<RSInfo> getRsList() |
| | | public List<RSInfo> getRsInfos() |
| | | { |
| | | return broker.getRsList(); |
| | | return broker.getRsInfos(); |
| | | } |
| | | |
| | | |
| | |
| | | * for and import, false if the IEContext |
| | | * will be used for and export. |
| | | */ |
| | | public IEContext(boolean importInProgress) |
| | | private IEContext(boolean importInProgress) |
| | | { |
| | | this.importInProgress = importInProgress; |
| | | this.startTime = System.currentTimeMillis(); |
| | |
| | | * @return A boolean indicating if a total update import is currently in |
| | | * Progress. |
| | | */ |
| | | public boolean importInProgress() |
| | | boolean importInProgress() |
| | | { |
| | | return importInProgress; |
| | | } |
| | |
| | | entryCount = total; |
| | | entryLeftCount = total; |
| | | |
| | | if (initializeTask != null) |
| | | if (initializeTask instanceof InitializeTask) |
| | | { |
| | | if (initializeTask instanceof InitializeTask) |
| | | { |
| | | ((InitializeTask)initializeTask).setTotal(entryCount); |
| | | ((InitializeTask)initializeTask).setLeft(entryCount); |
| | | } |
| | | else if (initializeTask instanceof InitializeTargetTask) |
| | | { |
| | | ((InitializeTargetTask)initializeTask).setTotal(entryCount); |
| | | ((InitializeTargetTask)initializeTask).setLeft(entryCount); |
| | | } |
| | | final InitializeTask task = (InitializeTask) initializeTask; |
| | | task.setTotal(entryCount); |
| | | task.setLeft(entryCount); |
| | | } |
| | | else if (initializeTask instanceof InitializeTargetTask) |
| | | { |
| | | final InitializeTargetTask task = (InitializeTargetTask) initializeTask; |
| | | task.setTotal(entryCount); |
| | | task.setLeft(entryCount); |
| | | } |
| | | } |
| | | |
| | |
| | | * |
| | | * @throws DirectoryException if an error occurred. |
| | | */ |
| | | public void updateCounters(int entriesDone) throws DirectoryException |
| | | private void updateCounters(int entriesDone) throws DirectoryException |
| | | { |
| | | entryLeftCount -= entriesDone; |
| | | |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "[ Entry count=" + this.entryCount + |
| | | return "[Entry count=" + this.entryCount + |
| | | ", Entry left count=" + this.entryLeftCount + "]"; |
| | | } |
| | | |
| | |
| | | * @param serverId serverId of the acknowledger/receiver/importer server. |
| | | * @param numAck id of the message received. |
| | | */ |
| | | public void setAckVal(int serverId, int numAck) |
| | | private void setAckVal(int serverId, int numAck) |
| | | { |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace("[IE] setAckVal[" + serverId + "]=" + numAck); |
| | |
| | | if (target >= 0) |
| | | { |
| | | // FIXME Could we check now that it is a know server in the domain ? |
| | | // JNR: Yes please |
| | | } |
| | | return target; |
| | | } |
| | |
| | | * |
| | | * @param target The server-id of the server that should be initialized. |
| | | * The target can be discovered using the |
| | | * {@link #getReplicasList()} method. |
| | | * {@link #getReplicaInfos()} method. |
| | | * @param initTask The task that triggers this initialization and that should |
| | | * be updated with its progress. |
| | | * |
| | |
| | | logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL, |
| | | countEntries(), getBaseDNString(), getServerId()); |
| | | |
| | | for (DSInfo dsi : getReplicasList()) |
| | | { |
| | | ieCtx.startList.add(dsi.getDsId()); |
| | | } |
| | | ieCtx.startList.addAll(getReplicaInfos().keySet()); |
| | | |
| | | // We manage the list of servers with which a flow control can be enabled |
| | | for (DSInfo dsi : getReplicasList()) |
| | | for (DSInfo dsi : getReplicaInfos().values()) |
| | | { |
| | | if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | |
| | | ieCtx.startList.add(serverToInitialize); |
| | | |
| | | // We manage the list of servers with which a flow control can be enabled |
| | | for (DSInfo dsi : getReplicasList()) |
| | | for (DSInfo dsi : getReplicaInfos().values()) |
| | | { |
| | | if (dsi.getDsId() == serverToInitialize && |
| | | dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | |
| | | { |
| | | throw new DirectoryException( |
| | | ResultCode.OTHER, |
| | | ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get( |
| | | ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(getBaseDNString(), |
| | | ieCtx.failureList)); |
| | | } |
| | | |
| | |
| | | |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace("[IE] In " + broker.getReplicationMonitorInstanceName() |
| | | + " export ends with " + " connected=" + broker.isConnected() |
| | | + " export ends with connected=" + broker.isConnected() |
| | | + " exportRootException=" + exportRootException); |
| | | |
| | | if (exportRootException != null) |
| | |
| | | do |
| | | { |
| | | done = true; |
| | | for (DSInfo dsi : getReplicasList()) |
| | | for (DSInfo dsi : getReplicaInfos().values()) |
| | | { |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace( |
| | |
| | | considered in the processing of sorting the successfully initialized |
| | | and the others |
| | | */ |
| | | for (DSInfo dsi : getReplicasList()) |
| | | { |
| | | replicasWeAreWaitingFor.add(dsi.getDsId()); |
| | | } |
| | | replicasWeAreWaitingFor.addAll(getReplicaInfos().keySet()); |
| | | |
| | | boolean done; |
| | | do |
| | |
| | | done = false; |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | if (dsInfo.getGenerationId() == getGenerationID()) |
| | | { // and with the expected generationId |
| | | // We're done with this server |
| | | it.remove(); |
| | | } |
| | | |
| | | if (dsInfo.getGenerationId() == getGenerationID()) |
| | | { // and with the expected generationId |
| | | // We're done with this server |
| | | it.remove(); |
| | | } |
| | | } |
| | | } |
| | |
| | | Thread.currentThread().interrupt(); |
| | | } // 1sec |
| | | } |
| | | |
| | | } |
| | | while (!done && !broker.shuttingDown()); // infinite wait |
| | | |
| | |
| | | * |
| | | * @throws IOException when an error occurred. |
| | | */ |
| | | public void exportLDIFEntry(byte[] lDIFEntry, int pos, int length) |
| | | throws IOException |
| | | void exportLDIFEntry(byte[] lDIFEntry, int pos, int length) |
| | | throws IOException |
| | | { |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace("[IE] Entering exportLDIFEntry entry=" + |
| | |
| | | } |
| | | |
| | | /** |
| | | * Initializes this domain from another source server. |
| | | * <p> |
| | | * When this method is called, a request for initialization will |
| | | * be sent to the source server asking for initialization. |
| | | * <p> |
| | | * The {@code exportBackend(OutputStream)} will therefore be called |
| | | * on the source server, and the {@code importBackend(InputStream)} |
| | | * will be called on his server. |
| | | * <p> |
| | | * The InputStream and OutpuStream given as a parameter to those |
| | | * methods will be connected through the replication protocol. |
| | | * |
| | | * @param source The server-id of the source from which to initialize. |
| | | * The source can be discovered using the |
| | | * {@link #getReplicasList()} method. |
| | | * |
| | | * @throws DirectoryException If it was not possible to publish the |
| | | * Initialization message to the Topology. |
| | | */ |
| | | public void initializeFromRemote(int source) throws DirectoryException |
| | | { |
| | | initializeFromRemote(source, null); |
| | | } |
| | | |
| | | /** |
| | | * Initializes a remote server from this server. |
| | | * <p> |
| | | * The {@code exportBackend(OutputStream)} will therefore be called |
| | | * on this server, and the {@code importBackend(InputStream)} |
| | | * will be called on the remote server. |
| | | * <p> |
| | | * The InputStream and OutpuStream given as a parameter to those |
| | | * methods will be connected through the replication protocol. |
| | | * |
| | | * @param target The server-id of the server that should be initialized. |
| | | * The target can be discovered using the |
| | | * {@link #getReplicasList()} method. |
| | | * |
| | | * @throws DirectoryException If it was not possible to publish the |
| | | * Initialization message to the Topology. |
| | | */ |
| | | public void initializeRemote(int target) throws DirectoryException |
| | | { |
| | | initializeRemote(target, null); |
| | | } |
| | | |
| | | /** |
| | | * Initializes asynchronously this domain from a remote source server. |
| | | * Before returning from this call, for the provided task : |
| | | * - the progressing counters are updated during the initialization using |
| | |
| | | * |
| | | * @param source The server-id of the source from which to initialize. |
| | | * The source can be discovered using the |
| | | * {@link #getReplicasList()} method. |
| | | * {@link #getReplicaInfos()} method. |
| | | * |
| | | * @param initTask The task that launched the initialization |
| | | * and should be updated of its progress. |
| | |
| | | * task has initially been created (this server, |
| | | * or the remote server). |
| | | */ |
| | | void initialize(InitializeTargetMsg initTargetMsgReceived, |
| | | private void initialize(InitializeTargetMsg initTargetMsgReceived, |
| | | int requesterServerId) |
| | | { |
| | | InitializeTask initFromTask = null; |
| | |
| | | ieCtx.importSource = source; |
| | | ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount()); |
| | | ieCtx.initWindow = initTargetMsgReceived.getInitWindow(); |
| | | // Protocol version is -1 when not known. |
| | | ieCtx.exporterProtocolVersion = getProtocolVersion(source); |
| | | initFromTask = (InitializeTask) ieCtx.initializeTask; |
| | | |
| | |
| | | * @param dsServerId The provided serverId. |
| | | * @return The protocol version. |
| | | */ |
| | | short getProtocolVersion(int dsServerId) |
| | | private short getProtocolVersion(int dsServerId) |
| | | { |
| | | short protocolVersion = -1; |
| | | for (DSInfo dsi : getReplicasList()) |
| | | final DSInfo dsInfo = getReplicaInfos().get(dsServerId); |
| | | if (dsInfo != null) |
| | | { |
| | | if (dsi.getDsId() == dsServerId) |
| | | { |
| | | protocolVersion = dsi.getProtocolVersion(); |
| | | break; |
| | | } |
| | | return dsInfo.getProtocolVersion(); |
| | | } |
| | | return protocolVersion; |
| | | return -1; |
| | | } |
| | | |
| | | /** |
| | |
| | | for (int i = 0; i< 50; i++) |
| | | { |
| | | allSet = true; |
| | | for (RSInfo rsInfo : getRsList()) |
| | | for (RSInfo rsInfo : getRsInfos()) |
| | | { |
| | | // the 'empty' RSes (generationId==-1) are considered as good citizens |
| | | if (rsInfo.getGenerationId() != -1 && |
| | |
| | | * connected to a Replication Server or it |
| | | * was not possible to contact it. |
| | | */ |
| | | public void resetReplicationLog() throws DirectoryException |
| | | void resetReplicationLog() throws DirectoryException |
| | | { |
| | | // Reset the Generation ID to -1 to clean the ReplicationServers. |
| | | resetGenerationId(-1L); |
| | |
| | | { |
| | | // create the broker object used to publish and receive changes |
| | | broker = new ReplicationBroker( |
| | | this, state, config, getGenerationID(), new ReplSessionSecurity()); |
| | | this, state, config, new ReplSessionSecurity()); |
| | | broker.start(); |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Applies a configuration change to the attributes which should be be |
| | | * included in the ECL. |
| | | * Applies a configuration change to the attributes which should be included |
| | | * in the ECL. |
| | | * |
| | | * @param includeAttributes |
| | | * attributes to be included with all change records. |
| | |
| | | * @param msg The byte array containing the information that should |
| | | * be sent to the remote entities. |
| | | */ |
| | | public void publish(byte[] msg) |
| | | void publish(byte[] msg) |
| | | { |
| | | UpdateMsg update; |
| | | synchronized (this) |
| | |
| | | Set<String> includeAttributes, |
| | | Set<String> includeAttributesForDeletes) |
| | | { |
| | | boolean configurationChanged = false; |
| | | |
| | | synchronized (eclIncludesLock) |
| | | ECLIncludes current; |
| | | ECLIncludes updated; |
| | | do |
| | | { |
| | | Set<String> s1 = new HashSet<String>(includeAttributes); |
| | | |
| | | // Combine all+delete attributes. |
| | | Set<String> s2 = new HashSet<String>(s1); |
| | | s2.addAll(includeAttributesForDeletes); |
| | | |
| | | if (!s1.equals(eclIncludesByServer.get(serverId))) |
| | | { |
| | | configurationChanged = true; |
| | | eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1)); |
| | | } |
| | | |
| | | if (!s2.equals(eclIncludesForDeletesByServer.get(serverId))) |
| | | { |
| | | configurationChanged = true; |
| | | eclIncludesForDeletesByServer.put(serverId, |
| | | Collections.unmodifiableSet(s2)); |
| | | } |
| | | |
| | | // and rebuild the global list to be ready for usage |
| | | Set<String> s = new HashSet<String>(); |
| | | for (Set<String> attributes : eclIncludesByServer.values()) |
| | | { |
| | | s.addAll(attributes); |
| | | } |
| | | eclIncludesAllServers = Collections.unmodifiableSet(s); |
| | | |
| | | s = new HashSet<String>(); |
| | | for (Set<String> attributes : eclIncludesForDeletesByServer.values()) |
| | | { |
| | | s.addAll(attributes); |
| | | } |
| | | eclIncludesForDeletesAllServers = Collections.unmodifiableSet(s); |
| | | current = this.eclIncludes.get(); |
| | | updated = current.addIncludedAttributes( |
| | | serverId, includeAttributes, includeAttributesForDeletes); |
| | | } |
| | | |
| | | return configurationChanged; |
| | | while (!this.eclIncludes.compareAndSet(current, updated)); |
| | | return current != updated; |
| | | } |
| | | |
| | | |
| | |
| | | */ |
| | | public Set<String> getEclIncludes() |
| | | { |
| | | synchronized (eclIncludesLock) |
| | | { |
| | | return eclIncludesAllServers; |
| | | } |
| | | return eclIncludes.get().includedAttrsAllServers; |
| | | } |
| | | |
| | | |
| | |
| | | */ |
| | | public Set<String> getEclIncludesForDeletes() |
| | | { |
| | | synchronized (eclIncludesLock) |
| | | { |
| | | return eclIncludesForDeletesAllServers; |
| | | } |
| | | return eclIncludes.get().includedAttrsForDeletesAllServers; |
| | | } |
| | | |
| | | |
| | |
| | | */ |
| | | Set<String> getEclIncludes(int serverId) |
| | | { |
| | | synchronized (eclIncludesLock) |
| | | { |
| | | return eclIncludesByServer.get(serverId); |
| | | } |
| | | return eclIncludes.get().includedAttrsByServer.get(serverId); |
| | | } |
| | | |
| | | |
| | |
| | | */ |
| | | Set<String> getEclIncludesForDeletes(int serverId) |
| | | { |
| | | synchronized (eclIncludesLock) |
| | | { |
| | | return eclIncludesForDeletesByServer.get(serverId); |
| | | } |
| | | return eclIncludes.get().includedAttrsForDeletesByServer.get(serverId); |
| | | } |
| | | |
| | | /** |