| | |
| | | import java.io.InputStream; |
| | | import java.io.OutputStream; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.Date; |
| | | import java.util.HashMap; |
| | | import java.util.HashSet; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.SortedMap; |
| | | import java.util.TreeMap; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.*; |
| | | import java.util.concurrent.TimeoutException; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | |
| | |
| | | */ |
| | | private final ChangeNumberGenerator generator; |
| | | |
| | | private final Map<Integer, Set<String>> eclIncludeByServer = |
| | | new ConcurrentHashMap<Integer, Set<String>>(); |
| | | Set<String> crossServersECLIncludes = new HashSet<String>(); |
| | | private final Object eclIncludesLock = new Object(); |
| | | private final Map<Integer, Set<String>> eclIncludesByServer = |
| | | new HashMap<Integer, Set<String>>(); |
| | | private Set<String> eclIncludesAllServers = Collections.emptySet(); |
| | | |
| | | private final Map<Integer, Set<String>> eclIncludesForDeletesByServer = |
| | | new HashMap<Integer, Set<String>>(); |
| | | private Set<String> eclIncludesForDeletesAllServers = Collections |
| | | .emptySet(); |
| | | |
| | | /** |
| | | * An object used to protect the initialization of the underlying broker |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Change some ReplicationDomain parameters : the ECL include attribute. |
| | | * Applies a configuration change to the attributes which should be be |
| | | * included in the ECL. |
| | | * |
| | | * @param newECLInclude The new ECL attribute. |
| | | * @param includeAttributes |
| | | * attributes to be included with all change records. |
| | | * @param includeAttributesForDeletes |
| | | * additional attributes to be included with delete change records. |
| | | */ |
| | | public void changeConfig(Set<String> newECLInclude) |
| | | public void changeConfig(Set<String> includeAttributes, |
| | | Set<String> includeAttributesForDeletes) |
| | | { |
| | | boolean configECLIncludeChanged = false; |
| | | Set<String> currentECLInclude = this.getEclInclude(serverID); |
| | | |
| | | if (newECLInclude.size() != currentECLInclude.size()) |
| | | if (setEclIncludes(serverID, includeAttributes, |
| | | includeAttributesForDeletes)) |
| | | { |
| | | configECLIncludeChanged = true; |
| | | } |
| | | else |
| | | { |
| | | // compare current config and new config |
| | | for (String attr : currentECLInclude) |
| | | { |
| | | if (!newECLInclude.contains(attr)) |
| | | { |
| | | configECLIncludeChanged = true; |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (configECLIncludeChanged) |
| | | { |
| | | // set new config |
| | | this.setEclInclude(this.serverID, newECLInclude); |
| | | if (broker != null) |
| | | { |
| | | disableService(); |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * This method should trigger an export of the replicated data. |
| | | * to the provided outputStream. |
| | |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Set the attributes configured on a server to be included in the ECL. |
| | | * @param serverId server where these attributes are configured. |
| | | * @param attributes the configured attributes. |
| | | * Set the attributes configured on a server to be included in the ECL. |
| | | * |
| | | * @param serverId |
| | | * Server where these attributes are configured. |
| | | * @param includeAttributes |
| | | * Attributes to be included with all change records, may include |
| | | * wild-cards. |
| | | * @param includeAttributesForDeletes |
| | | * Additional attributes to be included with delete change records, |
| | | * may include wild-cards. |
| | | * @return {@code true} if the set of attributes was modified. |
| | | */ |
| | | public void setEclInclude(int serverId, Set<String> attributes) |
| | | public boolean setEclIncludes(int serverId, |
| | | Set<String> includeAttributes, |
| | | Set<String> includeAttributesForDeletes) |
| | | { |
| | | synchronized(eclIncludeByServer) |
| | | boolean configurationChanged = false; |
| | | |
| | | synchronized (eclIncludesLock) |
| | | { |
| | | eclIncludeByServer.put(serverId, attributes); |
| | | Set<String> s1 = new HashSet<String>(includeAttributes); |
| | | |
| | | // Combine all+delete attributes. |
| | | Set<String> s2 = new HashSet<String>(s1); |
| | | s2.addAll(includeAttributesForDeletes); |
| | | |
| | | Set<String> s = eclIncludesByServer.get(serverId); |
| | | if (!s1.equals(s)) |
| | | { |
| | | configurationChanged = true; |
| | | eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1)); |
| | | } |
| | | |
| | | s = eclIncludesForDeletesByServer.get(serverId); |
| | | if (!s2.equals(s)) |
| | | { |
| | | configurationChanged = true; |
| | | eclIncludesForDeletesByServer.put(serverId, |
| | | Collections.unmodifiableSet(s2)); |
| | | } |
| | | |
| | | // and rebuild the global list to be ready for usage |
| | | crossServersECLIncludes.clear(); |
| | | for (Set<String> attributesByServer : eclIncludeByServer.values()) |
| | | for (String attribute : attributesByServer) |
| | | crossServersECLIncludes.add(attribute); |
| | | s = new HashSet<String>(); |
| | | for (Set<String> attributes : eclIncludesByServer.values()) |
| | | { |
| | | s.addAll(attributes); |
| | | } |
| | | eclIncludesAllServers = Collections.unmodifiableSet(s); |
| | | |
| | | s = new HashSet<String>(); |
| | | for (Set<String> attributes : eclIncludesForDeletesByServer.values()) |
| | | { |
| | | s.addAll(attributes); |
| | | } |
| | | eclIncludesForDeletesAllServers = Collections.unmodifiableSet(s); |
| | | } |
| | | |
| | | return configurationChanged; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Get the attributes to include in each change for the ECL. |
| | | * It's a set : an attribute appears once even if configured on more than one |
| | | * server. |
| | | * |
| | | * @return The attributes to include in each change for the ECL. |
| | | */ |
| | | public Set<String> getEclInclude() |
| | | public Set<String> getEclIncludes() |
| | | { |
| | | return crossServersECLIncludes; |
| | | } |
| | | |
| | | /** |
| | | * Get the attributes to include in each change for the ECL |
| | | * for a given serverId. |
| | | * @param serverId The serverId for which we want the include attributes. |
| | | * @return The attributes. |
| | | */ |
| | | public Set<String> getEclInclude(int serverId) |
| | | { |
| | | synchronized(eclIncludeByServer) |
| | | synchronized (eclIncludesLock) |
| | | { |
| | | return eclIncludeByServer.get(serverId); |
| | | return eclIncludesAllServers; |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Get the attributes to include in each delete change for the ECL. |
| | | * |
| | | * @return The attributes to include in each delete change for the ECL. |
| | | */ |
| | | public Set<String> getEclIncludesForDeletes() |
| | | { |
| | | synchronized (eclIncludesLock) |
| | | { |
| | | return eclIncludesForDeletesAllServers; |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Get the attributes to include in each change for the ECL for a given |
| | | * serverId. |
| | | * |
| | | * @param serverId |
| | | * The serverId for which we want the include attributes. |
| | | * @return The attributes. |
| | | */ |
| | | public Set<String> getEclIncludes(int serverId) |
| | | { |
| | | synchronized (eclIncludesLock) |
| | | { |
| | | return eclIncludesByServer.get(serverId); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Get the attributes to include in each change for the ECL for a given |
| | | * serverId. |
| | | * |
| | | * @param serverId |
| | | * The serverId for which we want the include attributes. |
| | | * @return The attributes. |
| | | */ |
| | | public Set<String> getEclIncludesForDeletes(int serverId) |
| | | { |
| | | synchronized (eclIncludesLock) |
| | | { |
| | | return eclIncludesForDeletesByServer.get(serverId); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Returns the ChangeNUmber of the last Change that was fully processed |
| | | * by this ReplicationDomain. |