| | |
| | | base dn : %s |
| | | MILD_ERR_ERROR_SEARCHING_RUV_15=Error %s when searching for server state %s : \ |
| | | %s base dn : %s |
| | | NOTICE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER_19=More than one replication \ |
| | | server should be configured |
| | | SEVERE_ERR_EXCEPTION_SENDING_TOPO_INFO_20=Caught IOException while sending \ |
| | | topology info (for update) on domain %s for %s server %s : %s |
| | | MILD_ERR_CANNOT_RECOVER_CHANGES_21=Error when searching old changes from the \ |
| | |
| | | private final SortedMap<CSN, FakeOperation> replayOperations = |
| | | new TreeMap<CSN, FakeOperation>(); |
| | | |
| | | /** |
| | | * The isolation policy that this domain is going to use. |
| | | * This field describes the behavior of the domain when an update is |
| | | * attempted and the domain could not connect to any Replication Server. |
| | | * Possible values are accept-updates or deny-updates, but other values |
| | | * may be added in the future. |
| | | */ |
| | | private IsolationPolicy isolationPolicy; |
| | | |
| | | /** |
| | | * The DN of the configuration entry of this domain. |
| | | */ |
| | | private final DN configDn; |
| | | private ReplicationDomainCfg config; |
| | | private ExternalChangelogDomain eclDomain; |
| | | |
| | | /** |
| | |
| | | private static final int FRACTIONAL_BECOME_NO_OP = 3; |
| | | |
| | | /** |
| | | * This configuration boolean indicates if this ReplicationDomain should log |
| | | * CSNs. |
| | | */ |
| | | private boolean logCSN = false; |
| | | |
| | | /** |
| | | * This configuration integer indicates the time the domain keeps the |
| | | * historical information necessary to solve conflicts.<br> |
| | | * When a change stored in the historical part of the user entry has a date |
| | | * (from its replication CSN) older than this delay, it is candidate to be |
| | | * purged. |
| | | */ |
| | | private long histPurgeDelayInMilliSec = 0; |
| | | |
| | | /** |
| | | * The last CSN purged in this domain. Allows to have a continuous purging |
| | | * process from one purge processing (task run) to the next one. Values 0 when |
| | | * the server starts. |
| | |
| | | * @throws ConfigException In case of invalid configuration. |
| | | */ |
| | | public LDAPReplicationDomain(ReplicationDomainCfg configuration, |
| | | BlockingQueue<UpdateToReplay> updateToReplayQueue) |
| | | throws ConfigException |
| | | BlockingQueue<UpdateToReplay> updateToReplayQueue) throws ConfigException |
| | | { |
| | | super(configuration.getBaseDN(), |
| | | configuration.getServerId(), |
| | | configuration.getInitializationWindowSize()); |
| | | |
| | | // Read the configuration parameters. |
| | | Set<String> replicationServers = configuration.getReplicationServer(); |
| | | |
| | | int window = configuration.getWindowSize(); |
| | | /** |
| | | * The time in milliseconds between heartbeats from the replication |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | | long heartbeatInterval = configuration.getHeartbeatInterval(); |
| | | |
| | | this.isolationPolicy = configuration.getIsolationPolicy(); |
| | | this.configDn = configuration.dn(); |
| | | this.logCSN = configuration.isLogChangenumber(); |
| | | this.config = configuration; |
| | | this.updateToReplayQueue = updateToReplayQueue; |
| | | this.histPurgeDelayInMilliSec = |
| | | configuration.getConflictsHistoricalPurgeDelay()*60*1000; |
| | | |
| | | // Get assured configuration |
| | | readAssuredConfig(configuration, false); |
| | |
| | | // register as an AlertGenerator |
| | | DirectoryServer.registerAlertGenerator(this); |
| | | |
| | | startPublishService(replicationServers, window, heartbeatInterval, |
| | | configuration.getChangetimeHeartbeatInterval()); |
| | | startPublishService(configuration); |
| | | } |
| | | |
| | | /** |
| | |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, msg); |
| | | } |
| | | |
| | | // FIXME should the next call use the initWindow parameter rather than the |
| | | // instance variable? |
| | | super.initializeRemote(target, requestorID, initTask, this.initWindow); |
| | | } |
| | | |
| | |
| | | */ |
| | | private boolean brokerIsConnected() |
| | | { |
| | | final IsolationPolicy isolationPolicy = config.getIsolationPolicy(); |
| | | if (isolationPolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES)) |
| | | { |
| | | // this policy imply that we always accept updates. |
| | |
| | | // Note that a failed non-replication operation might not have a change |
| | | // number. |
| | | CSN curCSN = OperationContext.getCSN(op); |
| | | if (curCSN != null && logCSN) |
| | | if (curCSN != null && config.isLogChangenumber()) |
| | | { |
| | | op.addAdditionalLogItem(AdditionalLogItem.unquotedKeyValue(getClass(), |
| | | "replicationCSN", curCSN)); |
| | |
| | | { |
| | | // If the base entry does not exist, save the generation |
| | | // ID in the config entry |
| | | result = runSaveGenerationId(configDn, generationId); |
| | | result = runSaveGenerationId(config.dn(), generationId); |
| | | } |
| | | |
| | | if (result != ResultCode.SUCCESS) |
| | |
| | | { |
| | | // if the base entry does not exist look for the generationID |
| | | // in the config entry. |
| | | search = conn.processSearch(configDn.toString(), |
| | | search = conn.processSearch(config.dn().toString(), |
| | | SearchScope.BASE_OBJECT, |
| | | DereferencePolicy.DEREF_ALWAYS, 0, 0, false, |
| | | filter,attributes); |
| | |
| | | public ConfigChangeResult applyConfigurationChange( |
| | | ReplicationDomainCfg configuration) |
| | | { |
| | | isolationPolicy = configuration.getIsolationPolicy(); |
| | | logCSN = configuration.isLogChangenumber(); |
| | | histPurgeDelayInMilliSec = |
| | | configuration.getConflictsHistoricalPurgeDelay()*60*1000; |
| | | |
| | | changeConfig( |
| | | configuration.getReplicationServer(), |
| | | configuration.getWindowSize(), |
| | | configuration.getHeartbeatInterval(), |
| | | (byte)configuration.getGroupId()); |
| | | this.config = configuration; |
| | | changeConfig(configuration); |
| | | |
| | | // Read assured + fractional configuration and each time reconnect if needed |
| | | readAssuredConfig(configuration, true); |
| | |
| | | @Override |
| | | public DN getComponentEntryDN() |
| | | { |
| | | return configDn; |
| | | return config.dn(); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | try |
| | | { |
| | | DN eclConfigEntryDN = DN.decode("cn=external changeLog," + configDn); |
| | | DN eclConfigEntryDN = DN.decode("cn=external changeLog," + config.dn()); |
| | | if (DirectoryServer.getConfigHandler().entryExists(eclConfigEntryDN)) |
| | | { |
| | | DirectoryServer.getConfigHandler().deleteEntry(eclConfigEntryDN, null); |
| | |
| | | // unit test cases |
| | | try |
| | | { |
| | | DN configDn = config.dn(); |
| | | if (DirectoryServer.getConfigHandler().entryExists(configDn)) |
| | | { |
| | | try |
| | |
| | | } |
| | | |
| | | /** |
| | | * Return the purge delay (in ms) for the historical information stored |
| | | * in entries to solve conflicts for this domain. |
| | | * Return the minimum time (in ms) that the domain keeps the historical |
| | | * information necessary to solve conflicts. |
| | | * |
| | | * @return the purge delay. |
| | | */ |
| | | public long getHistoricalPurgeDelay() |
| | | { |
| | | return histPurgeDelayInMilliSec; |
| | | return config.getConflictsHistoricalPurgeDelay() * 60 * 1000; |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry); |
| | | lastCSNPurgedFromHist = entryHist.getOldestCSN(); |
| | | entryHist.setPurgeDelay(this.histPurgeDelayInMilliSec); |
| | | entryHist.setPurgeDelay(getHistoricalPurgeDelay()); |
| | | Attribute attr = entryHist.encodeAndPurge(); |
| | | count += entryHist.getLastPurgedValuesCount(); |
| | | List<Modification> mods = new LinkedList<Modification>(); |
| | |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.admin.std.server.ReplicationDomainCfg; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.*; |
| | |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | private volatile boolean shutdown = false; |
| | | private final Object startStopLock = new Object(); |
| | | /** |
| | | * Replication server URLs under this format: "<code>hostname:port</code>". |
| | | */ |
| | | private volatile Set<String> replicationServerUrls; |
| | | private volatile ReplicationDomainCfg config; |
| | | private volatile boolean connected = false; |
| | | /** |
| | | * String reported under CSN=monitor when there is no connected RS. |
| | |
| | | private volatile String replicationServer = NO_CONNECTED_SERVER; |
| | | private volatile Session session; |
| | | private final ServerState state; |
| | | private final DN baseDN; |
| | | private final int serverId; |
| | | private Semaphore sendWindow; |
| | | private int maxSendWindow; |
| | | private int rcvWindow = 100; |
| | | private int halfRcvWindow = rcvWindow / 2; |
| | | private int maxRcvWindow = rcvWindow; |
| | | private int timeout = 0; |
| | | private short protocolVersion; |
| | | private ReplSessionSecurity replSessionSecurity; |
| | | /** My group id. */ |
| | | private byte groupId = -1; |
| | | /** The group id of the RS we are connected to. */ |
| | | private byte rsGroupId = -1; |
| | | /** The server id of the RS we are connected to. */ |
| | |
| | | private Map<Integer, ServerState> replicaStates = |
| | | new HashMap<Integer, ServerState>(); |
| | | /** |
| | | * The expected duration in milliseconds between heartbeats received |
| | | * from the replication server. Zero means heartbeats are off. |
| | | */ |
| | | private long heartbeatInterval = 0; |
| | | /** |
| | | * A thread to monitor heartbeats on the session. |
| | | */ |
| | | private HeartbeatMonitor heartbeatMonitor; |
| | |
| | | * change time of this DS. |
| | | */ |
| | | private CTHeartbeatPublisherThread ctHeartbeatPublisherThread; |
| | | /** |
| | | * The expected period in milliseconds between these messages are sent |
| | | * to the replication server. Zero means heartbeats are off. |
| | | */ |
| | | private long changeTimeHeartbeatSendInterval = 0; |
| | | /* |
| | | * Properties for the last topology info received from the network. |
| | | */ |
| | |
| | | * @param replicationDomain The replication domain that is creating us. |
| | | * @param state The ServerState that should be used by this broker |
| | | * when negotiating the session with the replicationServer. |
| | | * @param baseDN The base DN that should be used by this broker |
| | | * when negotiating the session with the replicationServer. |
| | | * @param serverId The server ID that should be used by this broker |
| | | * when negotiating the session with the replicationServer. |
| | | * @param window The size of the send and receive window to use. |
| | | * @param config The configuration to use. |
| | | * @param generationId The generationId for the server associated to the |
| | | * provided serverId and for the domain associated to the provided baseDN. |
| | | * @param heartbeatInterval The interval (in ms) between heartbeats requested |
| | | * from the replicationServer, or zero if no heartbeats are requested. |
| | | * @param replSessionSecurity The session security configuration. |
| | | * @param groupId The group id of our domain. |
| | | * @param changeTimeHeartbeatInterval The interval (in ms) between Change |
| | | * time heartbeats are sent to the RS, |
| | | * or zero if no CSN heartbeat should be sent. |
| | | */ |
| | | public ReplicationBroker(ReplicationDomain replicationDomain, |
| | | ServerState state, DN baseDN, int serverId, int window, |
| | | long generationId, long heartbeatInterval, |
| | | ReplSessionSecurity replSessionSecurity, byte groupId, |
| | | long changeTimeHeartbeatInterval) |
| | | ServerState state, ReplicationDomainCfg config, long generationId, |
| | | ReplSessionSecurity replSessionSecurity) |
| | | { |
| | | this.domain = replicationDomain; |
| | | this.baseDN = baseDN; |
| | | this.serverId = serverId; |
| | | this.state = state; |
| | | this.config = config; |
| | | this.protocolVersion = ProtocolVersion.getCurrentVersion(); |
| | | this.replSessionSecurity = replSessionSecurity; |
| | | this.groupId = groupId; |
| | | this.generationID = generationId; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.rcvWindow = window; |
| | | this.maxRcvWindow = window; |
| | | this.halfRcvWindow = window / 2; |
| | | this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval; |
| | | this.rcvWindow = getMaxRcvWindow(); |
| | | this.halfRcvWindow = rcvWindow / 2; |
| | | |
| | | /* |
| | | * Only create a monitor if there is a replication domain (this is not the |
| | |
| | | synchronized (startStopLock) |
| | | { |
| | | shutdown = false; |
| | | this.rcvWindow = this.maxRcvWindow; |
| | | connect(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Start the ReplicationBroker. |
| | | * |
| | | * @param replicationServers list of servers used |
| | | */ |
| | | public void start(Set<String> replicationServers) |
| | | { |
| | | synchronized (startStopLock) |
| | | { |
| | | // Open Socket to the ReplicationServer Send the Start message |
| | | shutdown = false; |
| | | this.replicationServerUrls = replicationServers; |
| | | |
| | | if (this.replicationServerUrls.size() < 1) |
| | | { |
| | | Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get(); |
| | | logError(message); |
| | | } |
| | | |
| | | this.rcvWindow = this.maxRcvWindow; |
| | | this.rcvWindow = getMaxRcvWindow(); |
| | | connect(); |
| | | } |
| | | } |
| | |
| | | */ |
| | | public int getServerId() |
| | | { |
| | | return serverId; |
| | | return config.getServerId(); |
| | | } |
| | | |
| | | private DN getBaseDN() |
| | | { |
| | | return config.getBaseDN(); |
| | | } |
| | | |
| | | private Set<String> getReplicationServerUrls() |
| | | { |
| | | return config.getReplicationServer(); |
| | | } |
| | | |
| | | private byte getGroupId() |
| | | { |
| | | return (byte) config.getGroupId(); |
| | | } |
| | | |
| | | /** |
| | |
| | | replicationServerInfo.setLocallyConfigured(false); |
| | | return; |
| | | } |
| | | for (String serverUrl : replicationServerUrls) |
| | | for (String serverUrl : getReplicationServerUrls()) |
| | | { |
| | | if (isSameReplicationServerUrl(serverUrl, rsUrl)) |
| | | { |
| | |
| | | |
| | | private void connect() |
| | | { |
| | | if (this.baseDN.toNormalizedString().equalsIgnoreCase( |
| | | if (getBaseDN().toNormalizedString().equalsIgnoreCase( |
| | | ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)) |
| | | { |
| | | connectAsECL(); |
| | |
| | | Map<Integer, ReplicationServerInfo> rsInfos = |
| | | new ConcurrentHashMap<Integer, ReplicationServerInfo>(); |
| | | |
| | | for (String serverUrl : replicationServerUrls) |
| | | for (String serverUrl : getReplicationServerUrls()) |
| | | { |
| | | // Connect to server and get info about it |
| | | ReplicationServerInfo replicationServerInfo = |
| | |
| | | private void connectAsECL() |
| | | { |
| | | // FIXME:ECL List of RS to connect is for now limited to one RS only |
| | | String bestServer = this.replicationServerUrls.iterator().next(); |
| | | String bestServer = getReplicationServerUrls().iterator().next(); |
| | | |
| | | if (performPhaseOneHandshake(bestServer, true, true) != null) |
| | | { |
| | |
| | | |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | final int serverId = getServerId(); |
| | | final DN baseDN = getBaseDN(); |
| | | |
| | | /* |
| | | * Connect to each replication server and get their ServerState then find |
| | | * out which one is the best to connect to. |
| | |
| | | { |
| | | // At least one server answered, find the best one. |
| | | electedRsInfo = computeBestReplicationServer(true, -1, state, |
| | | replicationServerInfos, serverId, groupId, getGenerationID()); |
| | | replicationServerInfos, serverId, getGroupId(), getGenerationID()); |
| | | |
| | | // Best found, now initialize connection to this one (handshake phase 1) |
| | | if (debugEnabled()) |
| | |
| | | if (replicationServerInfos.size() > 0) |
| | | { |
| | | Message message = WARN_COULD_NOT_FIND_CHANGELOG.get( |
| | | serverId, |
| | | baseDN.toNormalizedString(), |
| | | serverId, baseDN.toNormalizedString(), |
| | | collectionToString(replicationServerInfos.keySet(), ", ")); |
| | | logError(message); |
| | | } |
| | |
| | | private void connectToReplicationServer(ReplicationServerInfo rsInfo, |
| | | ServerStatus initStatus, TopologyMsg topologyMsg) |
| | | { |
| | | final int serverId = getServerId(); |
| | | final DN baseDN = getBaseDN(); |
| | | try |
| | | { |
| | | replicationServer = session.getReadableRemoteAddress(); |
| | |
| | | } |
| | | } |
| | | sendWindow = new Semaphore(maxSendWindow); |
| | | rcvWindow = maxRcvWindow; |
| | | rcvWindow = getMaxRcvWindow(); |
| | | connected = true; |
| | | |
| | | /* |
| | |
| | | .getGenerationId(), session); |
| | | } |
| | | |
| | | final byte groupId = getGroupId(); |
| | | if (getRsGroupId() != groupId) |
| | | { |
| | | /* |
| | |
| | | int nChanges = ServerState.diffChanges(rsState, state); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("RB for dn " + baseDN + " and with server id " |
| | | + serverId + " computed " + nChanges + " changes late."); |
| | | TRACER.debugInfo("RB for dn " + getBaseDN() + " and with server id " |
| | | + getServerId() + " computed " + nChanges + " changes late."); |
| | | } |
| | | |
| | | /* |
| | |
| | | StartMsg serverStartMsg; |
| | | if (!isECL) |
| | | { |
| | | serverStartMsg = new ServerStartMsg(serverId, url, |
| | | baseDN, maxRcvWindow, heartbeatInterval, state, |
| | | getGenerationID(), isSslEncryption, groupId); |
| | | serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(), |
| | | getMaxRcvWindow(), config.getHeartbeatInterval(), state, |
| | | getGenerationID(), isSslEncryption, getGroupId()); |
| | | } |
| | | else |
| | | { |
| | | serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0, |
| | | maxRcvWindow, heartbeatInterval, state, |
| | | getGenerationID(), isSslEncryption, groupId); |
| | | getMaxRcvWindow(), config.getHeartbeatInterval(), state, |
| | | getGenerationID(), isSslEncryption, getGroupId()); |
| | | } |
| | | localSession.publish(serverStartMsg); |
| | | |
| | |
| | | ReplicationMsg msg = localSession.receive(); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n" |
| | | TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n" |
| | | + serverStartMsg + "\nAND RECEIVED:\n" + msg); |
| | | } |
| | | |
| | |
| | | |
| | | // Sanity check |
| | | DN repDN = replServerInfo.getBaseDN(); |
| | | if (!baseDN.equals(repDN)) |
| | | if (!getBaseDN().equals(repDN)) |
| | | { |
| | | errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get( |
| | | repDN.toNormalizedString(), baseDN.toNormalizedString()); |
| | | repDN.toNormalizedString(), getBaseDN().toNormalizedString()); |
| | | return null; |
| | | } |
| | | |
| | |
| | | } |
| | | catch (ConnectException e) |
| | | { |
| | | errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(serverId, |
| | | server, baseDN.toNormalizedString()); |
| | | errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(), |
| | | server, getBaseDN().toNormalizedString()); |
| | | return null; |
| | | } |
| | | catch (SocketTimeoutException e) |
| | | { |
| | | errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(serverId, |
| | | server, baseDN.toNormalizedString()); |
| | | errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(), |
| | | server, getBaseDN().toNormalizedString()); |
| | | return null; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId, |
| | | server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e)); |
| | | errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(getServerId(), |
| | | server, getBaseDN().toNormalizedString(), |
| | | stackTraceToSingleLineString(e)); |
| | | return null; |
| | | } |
| | | finally |
| | |
| | | // FIXME ECL In the handshake phase two, should RS send back a topo msg ? |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n" |
| | | TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n" |
| | | + startECLSessionMsg); |
| | | } |
| | | |
| | |
| | | connected = true; |
| | | } catch (Exception e) |
| | | { |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId, |
| | | server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e)); |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get( |
| | | getServerId(), server, getBaseDN().toNormalizedString(), |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | |
| | | setSession(null); |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n" |
| | | TRACER.debugInfo("In RB for " + getBaseDN() + "\nRB HANDSHAKE SENT:\n" |
| | | + startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg); |
| | | } |
| | | |
| | |
| | | return topologyMsg; |
| | | } catch (Exception e) |
| | | { |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId, |
| | | server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e)); |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get( |
| | | getServerId(), server, getBaseDN().toNormalizedString(), |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | |
| | | setSession(null); |
| | |
| | | Map<Integer, ReplicationServerInfo> rsInfos, int localServerId, |
| | | byte groupId, long generationId) |
| | | { |
| | | |
| | | // Shortcut, if only one server, this is the best |
| | | if (rsInfos.size() == 1) |
| | | { |
| | |
| | | { |
| | | Map<Integer, ReplicationServerInfo> result = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | |
| | | for (Integer rsId : bestServers.keySet()) |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | if (replicationServerInfo.isLocallyConfigured()) |
| | | ReplicationServerInfo rsInfo = entry.getValue(); |
| | | if (rsInfo.isLocallyConfigured()) |
| | | { |
| | | result.put(rsId, replicationServerInfo); |
| | | result.put(entry.getKey(), rsInfo); |
| | | } |
| | | } |
| | | return result; |
| | |
| | | { |
| | | Map<Integer, ReplicationServerInfo> result = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | |
| | | for (Integer rsId : bestServers.keySet()) |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | if (replicationServerInfo.getGroupId() == groupId) |
| | | ReplicationServerInfo rsInfo = entry.getValue(); |
| | | if (rsInfo.getGroupId() == groupId) |
| | | { |
| | | result.put(rsId, replicationServerInfo); |
| | | result.put(entry.getKey(), rsInfo); |
| | | } |
| | | } |
| | | return result; |
| | |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | boolean emptyState = true; |
| | | |
| | | for (Integer rsId : bestServers.keySet()) |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | if (replicationServerInfo.getGenerationId() == generationId) |
| | | ReplicationServerInfo rsInfo = entry.getValue(); |
| | | if (rsInfo.getGenerationId() == generationId) |
| | | { |
| | | result.put(rsId, replicationServerInfo); |
| | | if (!replicationServerInfo.serverState.isEmpty()) |
| | | result.put(entry.getKey(), rsInfo); |
| | | if (!rsInfo.serverState.isEmpty()) |
| | | emptyState = false; |
| | | } |
| | | } |
| | |
| | | { |
| | | // If the RS with a generationId have all an empty state, |
| | | // then the 'empty'(genId=-1) RSes are also candidate |
| | | for (Integer rsId : bestServers.keySet()) |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | if (replicationServerInfo.getGenerationId() == -1) |
| | | ReplicationServerInfo rsInfo = entry.getValue(); |
| | | if (rsInfo.getGenerationId() == -1) |
| | | { |
| | | result.put(rsId, replicationServerInfo); |
| | | result.put(entry.getKey(), rsInfo); |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Find replication servers who are up to date (or more up to date than us, |
| | | * Find replication servers that are up to date (or more up to date than us, |
| | | * if for instance we failed and restarted, having sent some changes to the |
| | | * RS but without having time to store our own state) regarding our own |
| | | * server id. If some servers more up to date, prefer this list but take |
| | | * server id. If some servers are more up to date, prefer this list but take |
| | | * only the latest CSN. |
| | | */ |
| | | CSN latestRsCSN = null; |
| | | for (Integer rsId : bestServers.keySet()) |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | ServerState rsState = replicationServerInfo.getServerState(); |
| | | CSN rsCSN = rsState.getCSN(localServerId); |
| | | final Integer rsId = entry.getKey(); |
| | | final ReplicationServerInfo rsInfo = entry.getValue(); |
| | | CSN rsCSN = rsInfo.getServerState().getCSN(localServerId); |
| | | if (rsCSN == null) |
| | | { |
| | | rsCSN = new CSN(0, 0, localServerId); |
| | |
| | | { |
| | | // This replication server has exactly the latest change from the |
| | | // local server |
| | | upToDateServers.put(rsId, replicationServerInfo); |
| | | upToDateServers.put(rsId, rsInfo); |
| | | } else |
| | | { |
| | | // This replication server is even more up to date than the local |
| | |
| | | { |
| | | if (rsCSN.equals(latestRsCSN)) |
| | | { |
| | | moreUpToDateServers.put(rsId, replicationServerInfo); |
| | | moreUpToDateServers.put(rsId, rsInfo); |
| | | } else |
| | | { |
| | | // This RS is even more up to date, clear the list and store this |
| | | // new RS |
| | | moreUpToDateServers.clear(); |
| | | moreUpToDateServers.put(rsId, replicationServerInfo); |
| | | moreUpToDateServers.put(rsId, rsInfo); |
| | | latestRsCSN = rsCSN; |
| | | } |
| | | } |
| | |
| | | * Initially look for all servers on the same host. If we find one in the |
| | | * same VM, then narrow the search. |
| | | */ |
| | | boolean filterServersInSameVM = false; |
| | | Map<Integer, ReplicationServerInfo> result = |
| | | boolean foundRSInSameVM = false; |
| | | final Map<Integer, ReplicationServerInfo> result = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | for (Integer rsId : bestServers.keySet()) |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | final HostPort hp = |
| | | HostPort.valueOf(replicationServerInfo.getServerURL()); |
| | | final Integer rsId = entry.getKey(); |
| | | final ReplicationServerInfo rsInfo = entry.getValue(); |
| | | final HostPort hp = HostPort.valueOf(rsInfo.getServerURL()); |
| | | if (hp.isLocalAddress()) |
| | | { |
| | | if (isLocalReplicationServerPort(hp.getPort())) |
| | | { |
| | | // An RS in the same VM will always have priority. |
| | | if (!filterServersInSameVM) |
| | | if (!foundRSInSameVM) |
| | | { |
| | | // An RS in the same VM will always have priority. |
| | | // Narrow the search to only include servers in this VM. |
| | | result.clear(); |
| | | filterServersInSameVM = true; |
| | | foundRSInSameVM = true; |
| | | } |
| | | result.put(rsId, replicationServerInfo); |
| | | result.put(rsId, rsInfo); |
| | | } |
| | | else if (!filterServersInSameVM) |
| | | else if (!foundRSInSameVM) |
| | | { |
| | | result.put(rsId, replicationServerInfo); |
| | | // OK, accept RSs on the same machine because we have not found an RS |
| | | // in the same VM yet |
| | | result.put(rsId, rsInfo); |
| | | } |
| | | else |
| | | { |
| | |
| | | Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>(); |
| | | // Precision for the operations (number of digits after the dot) |
| | | final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP); |
| | | for (Integer rsId : bestServers.keySet()) |
| | | for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | final Integer rsId = entry.getKey(); |
| | | final ReplicationServerInfo rsInfo = entry.getValue(); |
| | | |
| | | int rsWeight = replicationServerInfo.getWeight(); |
| | | // load goal = rs weight / sum of weights |
| | | BigDecimal loadGoalBd = BigDecimal.valueOf(rsWeight).divide( |
| | | BigDecimal loadGoalBd = BigDecimal.valueOf(rsInfo.getWeight()).divide( |
| | | BigDecimal.valueOf(sumOfWeights), mathContext); |
| | | BigDecimal currentLoadBd = BigDecimal.ZERO; |
| | | if (sumOfConnectedDSs != 0) |
| | | { |
| | | // current load = number of connected DSs / total number of DSs |
| | | int connectedDSs = replicationServerInfo.getConnectedDSNumber(); |
| | | int connectedDSs = rsInfo.getConnectedDSNumber(); |
| | | currentLoadBd = BigDecimal.valueOf(connectedDSs).divide( |
| | | BigDecimal.valueOf(sumOfConnectedDSs), mathContext); |
| | | } |
| | |
| | | mathContext); |
| | | |
| | | /* |
| | | Now compare both values: we must no disconnect the DS if this |
| | | Now compare both values: we must not disconnect the DS if this |
| | | is for going in a situation where the load distance of the other |
| | | RSs is the opposite of the future load distance of the local RS |
| | | or we would evaluate that we should disconnect just after being |
| | |
| | | private void startRSHeartBeatMonitoring() |
| | | { |
| | | // Start a heartbeat monitor thread. |
| | | final long heartbeatInterval = config.getHeartbeatInterval(); |
| | | if (heartbeatInterval > 0) |
| | | { |
| | | heartbeatMonitor = new HeartbeatMonitor(getServerId(), getRsServerId(), |
| | | baseDN.toNormalizedString(), session, heartbeatInterval); |
| | | getBaseDN().toNormalizedString(), session, heartbeatInterval); |
| | | heartbeatMonitor.start(); |
| | | } |
| | | } |
| | |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get( |
| | | baseDN.toNormalizedString(), e.getLocalizedMessage())); |
| | | getBaseDN().toNormalizedString(), e.getLocalizedMessage())); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | } |
| | |
| | | break; |
| | | } |
| | | |
| | | final int serverId = getServerId(); |
| | | final DN baseDN = getBaseDN(); |
| | | final int previousRsServerID = rsServerId; |
| | | try |
| | | { |
| | |
| | | // best server checking. |
| | | final ReplicationServerInfo bestServerInfo = |
| | | computeBestReplicationServer(false, previousRsServerID, state, |
| | | replicationServerInfos, serverId, groupId, generationID); |
| | | replicationServerInfos, serverId, getGroupId(), |
| | | generationID); |
| | | if (previousRsServerID != -1 |
| | | && (bestServerInfo == null |
| | | || bestServerInfo.getServerId() != previousRsServerID)) |
| | |
| | | monitorResponse.set(false); |
| | | |
| | | // publish Monitor Request Message to the Replication Server |
| | | publish(new MonitorRequestMsg(serverId, getRsServerId())); |
| | | publish(new MonitorRequestMsg(getServerId(), getRsServerId())); |
| | | |
| | | // wait for Response up to 10 seconds. |
| | | try |
| | |
| | | public void stop() |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("ReplicationBroker " + serverId + " is stopping and will" |
| | | + " close the connection to replication server " + rsServerId + " for" |
| | | + " domain " + baseDN); |
| | | TRACER.debugInfo("ReplicationBroker " + getServerId() + " is stopping" |
| | | + " and will close the connection to replication server " |
| | | + rsServerId + " for domain " + getBaseDN()); |
| | | |
| | | synchronized (startStopLock) |
| | | { |
| | |
| | | */ |
| | | public int getMaxRcvWindow() |
| | | { |
| | | return maxRcvWindow; |
| | | return config.getWindowSize(); |
| | | } |
| | | |
| | | /** |
| | |
| | | /** |
| | | * Change some configuration parameters. |
| | | * |
| | | * @param replicationServers The new list of replication servers. |
| | | * @param window The max window size. |
| | | * @param heartbeatInterval The heartBeat interval. |
| | | * |
| | | * @param newConfig The new config to use. |
| | | * @return A boolean indicating if the changes |
| | | * requires to restart the service. |
| | | * @param groupId The new group id to use |
| | | */ |
| | | public boolean changeConfig(Set<String> replicationServers, int window, |
| | | long heartbeatInterval, byte groupId) |
| | | public boolean changeConfig(ReplicationDomainCfg newConfig) |
| | | { |
| | | // These parameters needs to be renegotiated with the ReplicationServer |
| | | // so if they have changed, that requires restarting the session with |
| | |
| | | // A new session is necessary only when information regarding |
| | | // the connection is modified |
| | | boolean needToRestartSession = |
| | | this.replicationServerUrls == null |
| | | || !replicationServers.equals(this.replicationServerUrls) |
| | | || window != this.maxRcvWindow |
| | | || heartbeatInterval != this.heartbeatInterval |
| | | || groupId != this.groupId; |
| | | !newConfig.getReplicationServer().equals(config.getReplicationServer()) |
| | | || newConfig.getWindowSize() != config.getWindowSize() |
| | | || newConfig.getHeartbeatInterval() != config.getHeartbeatInterval() |
| | | || newConfig.getGroupId() != config.getGroupId(); |
| | | |
| | | this.replicationServerUrls = replicationServers; |
| | | this.rcvWindow = window; |
| | | this.maxRcvWindow = window; |
| | | this.halfRcvWindow = window / 2; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.groupId = groupId; |
| | | this.config = newConfig; |
| | | this.rcvWindow = newConfig.getWindowSize(); |
| | | this.halfRcvWindow = this.rcvWindow / 2; |
| | | |
| | | return needToRestartSession; |
| | | } |
| | |
| | | } catch (IOException ex) |
| | | { |
| | | Message message = ERR_EXCEPTION_SENDING_CS.get( |
| | | baseDN.toNormalizedString(), |
| | | Integer.toString(serverId), |
| | | getBaseDN().toNormalizedString(), |
| | | Integer.toString(getServerId()), |
| | | ex.getLocalizedMessage() + " " + stackTraceToSingleLineString(ex)); |
| | | logError(message); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Sets the group id of the broker. |
| | | * @param groupId The new group id. |
| | | */ |
| | | public void setGroupId(byte groupId) |
| | | { |
| | | this.groupId = groupId; |
| | | } |
| | | |
| | | /** |
| | | * Gets the info for DSs in the topology (except us). |
| | | * @return The info for DSs in the topology (except us) |
| | | */ |
| | |
| | | sent by the replication server in the topology message. We must count |
| | | ourselves as a connected server. |
| | | */ |
| | | connectedDSs.add(serverId); |
| | | connectedDSs.add(getServerId()); |
| | | } |
| | | |
| | | for (DSInfo dsInfo : dsList) |
| | |
| | | /** |
| | | * Starts publishing to the RS the current timestamp used in this server. |
| | | */ |
| | | public void startChangeTimeHeartBeatPublishing() |
| | | private void startChangeTimeHeartBeatPublishing() |
| | | { |
| | | // Start a CSN heartbeat thread. |
| | | if (changeTimeHeartbeatSendInterval > 0) |
| | | long changeTimeHeartbeatInterval = config.getChangetimeHeartbeatInterval(); |
| | | if (changeTimeHeartbeatInterval > 0) |
| | | { |
| | | final Session localSession = session; |
| | | final String threadName = "Replica DS(" + getServerId() |
| | | + ") change time heartbeat publisher for domain \"" |
| | | + baseDN + "\" to RS(" + getRsServerId() |
| | | + getBaseDN() + "\" to RS(" + getRsServerId() |
| | | + ") at " + localSession.getReadableRemoteAddress(); |
| | | |
| | | ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread( |
| | | threadName, localSession, changeTimeHeartbeatSendInterval, serverId); |
| | | threadName, localSession, changeTimeHeartbeatInterval, getServerId()); |
| | | ctHeartbeatPublisherThread.start(); |
| | | } else |
| | | } |
| | | else |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(this |
| | |
| | | /** |
| | | * Stops publishing to the RS the current timestamp used in this server. |
| | | */ |
| | | public synchronized void stopChangeTimeHeartBeatPublishing() |
| | | private synchronized void stopChangeTimeHeartBeatPublishing() |
| | | { |
| | | if (ctHeartbeatPublisherThread != null) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Set a new change time heartbeat interval to this broker. |
| | | * @param changeTimeHeartbeatInterval The new interval (in ms). |
| | | */ |
| | | public void setChangeTimeHeartbeatInterval(int changeTimeHeartbeatInterval) |
| | | { |
| | | stopChangeTimeHeartBeatPublishing(); |
| | | this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval; |
| | | startChangeTimeHeartBeatPublishing(); |
| | | } |
| | | |
| | | /** |
| | | * Set the connectRequiresRecovery to the provided value. |
| | | * This flag is used to indicate if a recovery of Update is necessary |
| | | * after a reconnection to a RS. |
| | |
| | | { |
| | | final StringBuilder sb = new StringBuilder(); |
| | | sb.append(getClass().getSimpleName()) |
| | | .append(" \"").append(baseDN).append(" ").append(serverId).append("\",") |
| | | .append(" groupId=").append(groupId) |
| | | .append(" \"").append(getBaseDN()).append(" ") |
| | | .append(getServerId()).append("\",") |
| | | .append(" groupId=").append(getGroupId()) |
| | | .append(", genId=").append(generationID) |
| | | .append(", connected=").append(connected).append(", "); |
| | | if (rsServerId == -1) |
| | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.admin.std.server.ReplicationDomainCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.backends.task.Task; |
| | | import org.opends.server.config.ConfigException; |
| | |
| | | * The startup phase of the ReplicationDomain subclass, |
| | | * should read the list of replication servers from the configuration, |
| | | * instantiate a {@link ServerState} then start the publish service |
| | | * by calling |
| | | * {@link #startPublishService(Set, int, long, long)}. |
| | | * by calling {@link #startPublishService(ReplicationDomainCfg)}. |
| | | * At this point it can start calling the {@link #publish(UpdateMsg)} |
| | | * method if needed. |
| | | * <p> |
| | |
| | | * - and each initialized/importer DS that publishes acknowledges each |
| | | * WINDOW/2 data msg received. |
| | | */ |
| | | protected int initWindow = 100; |
| | | protected final int initWindow; |
| | | |
| | | /* Status related monitoring fields */ |
| | | |
| | |
| | | |
| | | private final Map<Integer, Set<String>> eclIncludesForDeletesByServer = |
| | | new HashMap<Integer, Set<String>>(); |
| | | private Set<String> eclIncludesForDeletesAllServers = Collections |
| | | .emptySet(); |
| | | private Set<String> eclIncludesForDeletesAllServers = Collections.emptySet(); |
| | | |
| | | /** |
| | | * An object used to protect the initialization of the underlying broker |
| | |
| | | { |
| | | this.baseDN = baseDN; |
| | | this.serverID = serverID; |
| | | this.initWindow = 100; |
| | | this.state = serverState; |
| | | this.generator = new CSNGenerator(serverID, state); |
| | | |
| | |
| | | public void run() |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] starting " + this.getName()); |
| | | TRACER.debugInfo("[IE] starting " + getName()); |
| | | try |
| | | { |
| | | initializeRemote(serverIdToInitialize, serverIdToInitialize, null, |
| | |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] ending " + this.getName()); |
| | | TRACER.debugInfo("[IE] ending " + getName()); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public int decodeTarget(String targetString) throws DirectoryException |
| | | { |
| | | if (targetString.equalsIgnoreCase("all")) |
| | | if ("all".equalsIgnoreCase(targetString)) |
| | | { |
| | | return RoutableMsg.ALL_SERVERS; |
| | | } |
| | |
| | | "[IE] wait for start dsid " + dsi.getDsId() |
| | | + " " + dsi.getStatus() |
| | | + " " + dsi.getGenerationId() |
| | | + " " + this.getGenerationID()); |
| | | + " " + getGenerationID()); |
| | | if (ieContext.startList.contains(dsi.getDsId())) |
| | | { |
| | | if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS) |
| | |
| | | } |
| | | else |
| | | { |
| | | if (dsInfo.getGenerationId() == this.getGenerationID()) |
| | | if (dsInfo.getGenerationId() == getGenerationID()) |
| | | { // and with the expected generationId |
| | | // We're done with this server |
| | | it.remove(); |
| | |
| | | { |
| | | // Rejects 2 simultaneous exports |
| | | Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get(); |
| | | throw new DirectoryException(ResultCode.OTHER, |
| | | message); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | |
| | | ieContext = new IEContext(importInProgress); |
| | |
| | | */ |
| | | private void processErrorMsg(ErrorMsg errorMsg) |
| | | { |
| | | if (ieContext != null) |
| | | //Exporting must not be stopped on the first error, if we run initialize-all |
| | | if (ieContext != null && ieContext.exportTarget != RoutableMsg.ALL_SERVERS) |
| | | { |
| | | /* |
| | | Exporting must not be stopped on the first error, if we |
| | | run initialize-all. |
| | | */ |
| | | if (ieContext.exportTarget != RoutableMsg.ALL_SERVERS) |
| | | // The ErrorMsg is received while we have started an initialization |
| | | if (ieContext.getException() == null) |
| | | { |
| | | // The ErrorMsg is received while we have started an initialization |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(new DirectoryException(ResultCode.OTHER, |
| | | errorMsg.getDetails())); |
| | | ieContext.setException( |
| | | new DirectoryException(ResultCode.OTHER, errorMsg.getDetails())); |
| | | } |
| | | |
| | | /* |
| | | * This can happen : |
| | | * - on the first InitReqMsg sent when source in not known for example |
| | | * - on the next attempt when source crashed and did not reconnect |
| | | * even after the nextInitAttemptDelay |
| | | * During the import, the ErrorMsg will be received by receiveEntryBytes |
| | | */ |
| | | if (ieContext.initializeTask instanceof InitializeTask) |
| | | { |
| | | // Update the task that initiated the import |
| | | ((InitializeTask)ieContext.initializeTask). |
| | | updateTaskCompletionState(ieContext.getException()); |
| | | /* |
| | | * This can happen : |
| | | * - on the first InitReqMsg sent when source in not known for example |
| | | * - on the next attempt when source crashed and did not reconnect |
| | | * even after the nextInitAttemptDelay |
| | | * During the import, the ErrorMsg will be received by receiveEntryBytes |
| | | */ |
| | | if (ieContext.initializeTask instanceof InitializeTask) |
| | | { |
| | | // Update the task that initiated the import |
| | | ((InitializeTask) ieContext.initializeTask) |
| | | .updateTaskCompletionState(ieContext.getException()); |
| | | |
| | | releaseIEContext(); |
| | | } |
| | | releaseIEContext(); |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | /* |
| | | This is the normal termination of the import |
| | | No error is stored and the import is ended |
| | | by returning null |
| | | No error is stored and the import is ended by returning null |
| | | */ |
| | | return null; |
| | | } |
| | |
| | | { |
| | | /* |
| | | This is an error termination during the import |
| | | The error is stored and the import is ended |
| | | by returning null |
| | | The error is stored and the import is ended by returning null |
| | | */ |
| | | if (ieContext.getException() == null) |
| | | { |
| | |
| | | { |
| | | // Other messages received during an import are trashed except |
| | | // the topologyMsg. |
| | | if ((msg instanceof TopologyMsg) && |
| | | (isRemoteDSConnected(ieContext.importSource)==null)) |
| | | if (msg instanceof TopologyMsg |
| | | && isRemoteDSConnected(ieContext.importSource) == null) |
| | | { |
| | | Message errMsg = |
| | | Message.raw(Category.SYNC, Severity.NOTICE, |
| | |
| | | catch(Exception e) { /* do nothing */ } |
| | | |
| | | // process any connection error |
| | | if ((broker.hasConnectionError())|| |
| | | (broker.getNumLostConnections()!= ieContext.initNumLostConnections)) |
| | | if (broker.hasConnectionError() |
| | | || broker.getNumLostConnections() != ieContext.initNumLostConnections) |
| | | { |
| | | // publish failed - store the error in the ieContext ... |
| | | DirectoryException de = new DirectoryException(ResultCode.OTHER, |
| | |
| | | * @throws DirectoryException When the generation ID of the Replication |
| | | * Servers is not the expected value. |
| | | */ |
| | | private void checkGenerationID(long generationID) |
| | | throws DirectoryException |
| | | private void checkGenerationID(long generationID) throws DirectoryException |
| | | { |
| | | boolean allSet = true; |
| | | |
| | |
| | | public void resetReplicationLog() throws DirectoryException |
| | | { |
| | | // Reset the Generation ID to -1 to clean the ReplicationServers. |
| | | resetGenerationId((long)-1); |
| | | resetGenerationId(-1L); |
| | | |
| | | // check that at least one ReplicationServer did change its generation-id |
| | | checkGenerationID(-1); |
| | |
| | | * @throws DirectoryException When an error occurs |
| | | */ |
| | | public void resetGenerationId(Long generationIdNewValue) |
| | | throws DirectoryException |
| | | throws DirectoryException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN |
| | | + " resetGenerationId " + generationIdNewValue); |
| | | |
| | | ResetGenerationIdMsg genIdMessage; |
| | | |
| | | if (generationIdNewValue == null) |
| | | { |
| | | genIdMessage = new ResetGenerationIdMsg(this.getGenerationID()); |
| | | } |
| | | else |
| | | { |
| | | genIdMessage = new ResetGenerationIdMsg(generationIdNewValue); |
| | | } |
| | | ResetGenerationIdMsg genIdMessage = |
| | | new ResetGenerationIdMsg(getGenId(generationIdNewValue)); |
| | | |
| | | if (!isConnected()) |
| | | { |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDNString(), |
| | | Integer.toString(serverID), |
| | | Long.toString(genIdMessage.getGenerationId())); |
| | | throw new DirectoryException( |
| | | resultCode, message); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | broker.publish(genIdMessage); |
| | | |
| | | // check that at least one ReplicationServer did change its generation-id |
| | | if (generationIdNewValue == null) |
| | | checkGenerationID(getGenId(generationIdNewValue)); |
| | | } |
| | | |
| | | private long getGenId(Long generationIdNewValue) |
| | | { |
| | | if (generationIdNewValue != null) |
| | | { |
| | | checkGenerationID(this.getGenerationID()); |
| | | return generationIdNewValue; |
| | | } |
| | | else |
| | | { |
| | | checkGenerationID(generationIdNewValue); |
| | | } |
| | | return getGenerationID(); |
| | | } |
| | | |
| | | |
| | |
| | | */ |
| | | |
| | | /** |
| | | * Start the publish mechanism of the Replication Service. |
| | | * After this method has been called, the publish service can be used |
| | | * by calling the {@link #publish(UpdateMsg)} method. |
| | | * Start the publish mechanism of the Replication Service. After this method |
| | | * has been called, the publish service can be used by calling the |
| | | * {@link #publish(UpdateMsg)} method. |
| | | * |
| | | * @param replicationServers The replication servers that should be used. |
| | | * @param window The window size of this replication domain. |
| | | * @param heartbeatInterval The heartbeatInterval that should be used |
| | | * to check the availability of the replication |
| | | * servers. |
| | | * @param changetimeHeartbeatInterval The interval used to send change |
| | | * time heartbeat to the replication server. |
| | | * |
| | | * @throws ConfigException If the DirectoryServer configuration was |
| | | * incorrect. |
| | | * @param config |
| | | * The configuration that should be used. |
| | | * @throws ConfigException |
| | | * If the DirectoryServer configuration was incorrect. |
| | | */ |
| | | public void startPublishService(Set<String> replicationServers, int window, |
| | | long heartbeatInterval, long changetimeHeartbeatInterval) |
| | | throws ConfigException |
| | | public void startPublishService(ReplicationDomainCfg config) |
| | | throws ConfigException |
| | | { |
| | | synchronized (sessionLock) |
| | | { |
| | |
| | | { |
| | | // create the broker object used to publish and receive changes |
| | | broker = new ReplicationBroker( |
| | | this, state, baseDN, |
| | | serverID, window, |
| | | getGenerationID(), |
| | | heartbeatInterval, |
| | | new ReplSessionSecurity(), |
| | | getGroupId(), |
| | | changetimeHeartbeatInterval); |
| | | |
| | | broker.start(replicationServers); |
| | | this, state, config, getGenerationID(), new ReplSessionSecurity()); |
| | | broker.start(); |
| | | } |
| | | } |
| | | } |
| | |
| | | * calling the {@link #processUpdate(UpdateMsg, AtomicBoolean)}. |
| | | * <p> |
| | | * This method must be called once and must be called after the |
| | | * {@link #startPublishService(Collection, int, long, long)}. |
| | | * {@link #startPublishService(ReplicationDomainCfg)}. |
| | | */ |
| | | public void startListenService() |
| | | { |
| | |
| | | * <p> |
| | | * The Replication Service will restart from the point indicated by the |
| | | * {@link ServerState} that was given as a parameter to the |
| | | * {@link #startPublishService(Collection, int, long, long)} |
| | | * at startup time. |
| | | * {@link #startPublishService(ReplicationDomainCfg)} at startup time. |
| | | * <p> |
| | | * If some data have changed in the repository during the period of time when |
| | | * the Replication Service was disabled, this {@link ServerState} should |
| | | * therefore be updated by the Replication Domain subclass before calling |
| | | * this method. This method is not MT safe. |
| | | * therefore be updated by the Replication Domain subclass before calling this |
| | | * method. This method is not MT safe. |
| | | */ |
| | | public void enableService() |
| | | { |
| | |
| | | /** |
| | | * Change some ReplicationDomain parameters. |
| | | * |
| | | * @param replicationServers The new set of Replication Servers that this |
| | | * domain should now use. |
| | | * @param windowSize The window size that this domain should use. |
| | | * @param heartbeatInterval The heartbeatInterval that this domain should |
| | | * use. |
| | | * @param groupId The new group id to use |
| | | * @param config |
| | | * The new configuration that this domain should now use. |
| | | */ |
| | | public void changeConfig(Set<String> replicationServers, int windowSize, |
| | | long heartbeatInterval, byte groupId) |
| | | public void changeConfig(ReplicationDomainCfg config) |
| | | { |
| | | this.groupId = groupId; |
| | | this.groupId = (byte) config.getGroupId(); |
| | | |
| | | if (broker != null |
| | | && broker.changeConfig(replicationServers, windowSize, |
| | | heartbeatInterval, groupId)) |
| | | if (broker != null && broker.changeConfig(config)) |
| | | { |
| | | disableService(); |
| | | enableService(); |
| | |
| | | one. Only Safe Read mode makes sense in DS for returning an ack. |
| | | */ |
| | | byte rsGroupId = broker.getRsGroupId(); |
| | | if (msg.isAssured()) |
| | | // Assured feature is supported starting from replication protocol V2 |
| | | if (msg.isAssured() |
| | | && broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2) |
| | | { |
| | | // Assured feature is supported starting from replication protocol V2 |
| | | if (broker.getProtocolVersion() >= |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V2) |
| | | AssuredMode msgAssuredMode = msg.getAssuredMode(); |
| | | if (msgAssuredMode == AssuredMode.SAFE_READ_MODE) |
| | | { |
| | | AssuredMode msgAssuredMode = msg.getAssuredMode(); |
| | | if (msgAssuredMode == AssuredMode.SAFE_READ_MODE) |
| | | if (rsGroupId == groupId) |
| | | { |
| | | if (rsGroupId == groupId) |
| | | // Send the ack |
| | | AckMsg ackMsg = new AckMsg(msg.getCSN()); |
| | | if (replayErrorMsg != null) |
| | | { |
| | | // Send the ack |
| | | AckMsg ackMsg = new AckMsg(msg.getCSN()); |
| | | if (replayErrorMsg != null) |
| | | { |
| | | // Mark the error in the ack |
| | | // -> replay error occurred |
| | | ackMsg.setHasReplayError(true); |
| | | // -> replay error occurred in our server |
| | | List<Integer> idList = new ArrayList<Integer>(); |
| | | idList.add(serverID); |
| | | ackMsg.setFailedServers(idList); |
| | | } |
| | | broker.publish(ackMsg); |
| | | if (replayErrorMsg != null) |
| | | { |
| | | assuredSrReceivedUpdatesNotAcked.incrementAndGet(); |
| | | } else |
| | | { |
| | | assuredSrReceivedUpdatesAcked.incrementAndGet(); |
| | | } |
| | | // Mark the error in the ack |
| | | // -> replay error occurred |
| | | ackMsg.setHasReplayError(true); |
| | | // -> replay error occurred in our server |
| | | List<Integer> idList = new ArrayList<Integer>(); |
| | | idList.add(serverID); |
| | | ackMsg.setFailedServers(idList); |
| | | } |
| | | } else if (assuredMode != AssuredMode.SAFE_DATA_MODE) |
| | | { |
| | | Message errorMsg = ERR_DS_UNKNOWN_ASSURED_MODE.get( |
| | | Integer.toString(serverID), msgAssuredMode.toString(), |
| | | getBaseDNString(), msg.toString()); |
| | | logError(errorMsg); |
| | | broker.publish(ackMsg); |
| | | if (replayErrorMsg != null) |
| | | { |
| | | assuredSrReceivedUpdatesNotAcked.incrementAndGet(); |
| | | } |
| | | else |
| | | { |
| | | assuredSrReceivedUpdatesAcked.incrementAndGet(); |
| | | } |
| | | } |
| | | // Nothing to do in Assured safe data mode, only RS ack updates. |
| | | } |
| | | else if (assuredMode != AssuredMode.SAFE_DATA_MODE) |
| | | { |
| | | Message errorMsg = |
| | | ERR_DS_UNKNOWN_ASSURED_MODE.get(Integer.toString(serverID), |
| | | msgAssuredMode.toString(), getBaseDNString(), msg.toString()); |
| | | logError(errorMsg); |
| | | } |
| | | // Nothing to do in Assured safe data mode, only RS ack updates. |
| | | } |
| | | |
| | | incProcessedUpdates(); |
| | |
| | | { |
| | | byte rsGroupId = broker.getRsGroupId(); |
| | | |
| | | // If assured mode configured, wait for acknowledgement for the just sent |
| | | // If assured mode configured, wait for acknowledgment for the just sent |
| | | // message |
| | | if (assured && rsGroupId == groupId) |
| | | { |
| | |
| | | remove the update from the wait list, log the timeout error and |
| | | also update assured monitoring counters |
| | | */ |
| | | UpdateMsg update = waitingAckMsgs.remove(csn); |
| | | |
| | | if (update != null) |
| | | { |
| | | // No luck, this is a real timeout |
| | | // Increment assured replication monitoring counters |
| | | switch (msg.getAssuredMode()) |
| | | { |
| | | case SAFE_READ_MODE: |
| | | assuredSrNotAcknowledgedUpdates.incrementAndGet(); |
| | | assuredSrTimeoutUpdates.incrementAndGet(); |
| | | // Increment number of errors for our RS |
| | | updateAssuredErrorsByServer( |
| | | assuredSrServerNotAcknowledgedUpdates, |
| | | broker.getRsServerId()); |
| | | break; |
| | | case SAFE_DATA_MODE: |
| | | assuredSdTimeoutUpdates.incrementAndGet(); |
| | | // Increment number of errors for our RS |
| | | updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates, |
| | | broker.getRsServerId()); |
| | | break; |
| | | default: |
| | | // Should not happen |
| | | } |
| | | |
| | | throw new TimeoutException("No ack received for message csn: " |
| | | + csn + " and replication servceID: " + baseDN + " after " |
| | | + assuredTimeout + " ms."); |
| | | } else |
| | | final UpdateMsg update = waitingAckMsgs.remove(csn); |
| | | if (update == null) |
| | | { |
| | | // Ack received just before timeout limit: we can exit |
| | | break; |
| | | } |
| | | |
| | | // No luck, this is a real timeout |
| | | // Increment assured replication monitoring counters |
| | | switch (msg.getAssuredMode()) |
| | | { |
| | | case SAFE_READ_MODE: |
| | | assuredSrNotAcknowledgedUpdates.incrementAndGet(); |
| | | assuredSrTimeoutUpdates.incrementAndGet(); |
| | | // Increment number of errors for our RS |
| | | updateAssuredErrorsByServer(assuredSrServerNotAcknowledgedUpdates, |
| | | broker.getRsServerId()); |
| | | break; |
| | | case SAFE_DATA_MODE: |
| | | assuredSdTimeoutUpdates.incrementAndGet(); |
| | | // Increment number of errors for our RS |
| | | updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates, |
| | | broker.getRsServerId()); |
| | | break; |
| | | default: |
| | | // Should not happen |
| | | } |
| | | |
| | | throw new TimeoutException("No ack received for message csn: " + csn |
| | | + " and replication domain: " + baseDN + " after " |
| | | + assuredTimeout + " ms."); |
| | | } |
| | | } |
| | | } |
| | |
| | | update = new UpdateMsg(generator.newCSN(), msg); |
| | | /* |
| | | If assured replication is configured, this will prepare blocking |
| | | mechanism. If assured replication is disabled, this returns |
| | | immediately |
| | | mechanism. If assured replication is disabled, this returns immediately |
| | | */ |
| | | prepareWaitForAckIfAssuredEnabled(update); |
| | | |
| | |
| | | waitForAckIfAssuredEnabled(update); |
| | | } catch (TimeoutException ex) |
| | | { |
| | | // This exception may only be raised if assured replication is |
| | | // enabled |
| | | // This exception may only be raised if assured replication is enabled |
| | | Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(), |
| | | Long.toString(assuredTimeout), update.toString()); |
| | | logError(errorMsg); |
| | |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.admin.std.server.ReplicationDomainCfg; |
| | | import org.opends.server.backends.task.TaskState; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.AddOperation; |
| | |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.GenerationIdChecksum; |
| | | import org.opends.server.replication.plugin.LDAPReplicationDomain; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.plugin.PersistentServerState; |
| | | import org.opends.server.replication.plugin.*; |
| | | import org.opends.server.replication.protocol.ReplSessionSecurity; |
| | | import org.opends.server.replication.protocol.ReplicationMsg; |
| | | import org.opends.server.replication.protocol.Session; |
| | |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.opends.server.config.ConfigConstants.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | |
| | | * does not exist, take the 'empty backend' generationID. |
| | | */ |
| | | protected ReplicationBroker openReplicationSession(final DN baseDN, |
| | | int serverId, int window_size, int port, int timeout, |
| | | int serverId, int windowSize, int port, int timeout, |
| | | boolean emptyOldChanges) throws Exception |
| | | { |
| | | return openReplicationSession(baseDN, serverId, window_size, |
| | | return openReplicationSession(baseDN, serverId, windowSize, |
| | | port, timeout, emptyOldChanges, getGenerationId(baseDN), null); |
| | | } |
| | | |
| | |
| | | * providing the generationId. |
| | | */ |
| | | protected ReplicationBroker openReplicationSession(final DN baseDN, |
| | | int serverId, int window_size, int port, int timeout, |
| | | int serverId, int windowSize, int port, int timeout, |
| | | boolean emptyOldChanges, long generationId) throws Exception |
| | | { |
| | | return openReplicationSession(baseDN, serverId, window_size, |
| | | return openReplicationSession(baseDN, serverId, windowSize, |
| | | port, timeout, emptyOldChanges, generationId, null); |
| | | } |
| | | |
| | |
| | | * providing the generationId. |
| | | */ |
| | | protected ReplicationBroker openReplicationSession(final DN baseDN, |
| | | int serverId, int window_size, int port, int timeout, |
| | | int serverId, int windowSize, int port, int timeout, |
| | | boolean emptyOldChanges, long generationId, |
| | | ReplicationDomain replicationDomain) throws Exception |
| | | { |
| | | DomainFakeCfg config = newFakeCfg(baseDN, serverId, port); |
| | | config.setWindowSize(windowSize); |
| | | return openReplicationSession(config, port, timeout, emptyOldChanges, |
| | | generationId, replicationDomain); |
| | | } |
| | | |
| | | protected ReplicationBroker openReplicationSession(ReplicationDomainCfg config, |
| | | int port, int timeout, boolean emptyOldChanges, long generationId, |
| | | ReplicationDomain replicationDomain) throws Exception |
| | | { |
| | | ServerState state = new ServerState(); |
| | | |
| | | if (emptyOldChanges) |
| | | new PersistentServerState(baseDN, serverId, new ServerState()); |
| | | new PersistentServerState(config.getBaseDN(), config.getServerId(), new ServerState()); |
| | | |
| | | ReplicationBroker broker = new ReplicationBroker(replicationDomain, |
| | | state, baseDN, serverId, window_size, |
| | | generationId, 100000, getReplSessionSecurity(), (byte)1, 500); |
| | | ReplicationBroker broker = new ReplicationBroker(replicationDomain, state, |
| | | config, generationId, getReplSessionSecurity()); |
| | | connect(broker, port, timeout); |
| | | return broker; |
| | | } |
| | | |
| | | private void connect(ReplicationBroker broker, int port, int timeout) throws Exception |
| | | protected DomainFakeCfg newFakeCfg(final DN baseDN, int serverId, int port) |
| | | { |
| | | broker.start(Collections.singleton("localhost:" + port)); |
| | | DomainFakeCfg fakeCfg = new DomainFakeCfg(baseDN, serverId, newSortedSet("localhost:" + port)); |
| | | fakeCfg.setHeartbeatInterval(100000); |
| | | fakeCfg.setChangetimeHeartbeatInterval(500); |
| | | return fakeCfg; |
| | | } |
| | | |
| | | protected void connect(ReplicationBroker broker, int port, int timeout) throws Exception |
| | | { |
| | | broker.start(); |
| | | // give some time to the broker to connect to the replicationServer. |
| | | checkConnection(30, broker, port); |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Open a replicationServer session to the local ReplicationServer |
| | | * with a default value generationId. |
| | | */ |
| | | protected ReplicationBroker openReplicationSession(final DN baseDN, |
| | | int serverId, int window_size, int port, int timeout, ServerState state) |
| | | throws Exception |
| | | { |
| | | return openReplicationSession(baseDN, serverId, window_size, |
| | | port, timeout, state, getGenerationId(baseDN)); |
| | | } |
| | | |
| | | /** |
| | | * Open a new session to the ReplicationServer |
| | | * starting with a given ServerState. |
| | | */ |
| | | protected ReplicationBroker openReplicationSession(final DN baseDN, |
| | | int serverId, int window_size, int port, int timeout, ServerState state, |
| | | long generationId) throws Exception |
| | | { |
| | | ReplicationBroker broker = new ReplicationBroker(null, |
| | | state, baseDN, serverId, window_size, generationId, |
| | | 100000, getReplSessionSecurity(), (byte)1, 500); |
| | | connect(broker, port, timeout); |
| | | return broker; |
| | | } |
| | | |
| | | protected void deleteEntry(DN dn) throws Exception |
| | | { |
| | | if (dn.getParent().getRDN().toString().equalsIgnoreCase("cn=domains")) |
| | |
| | | * |
| | | * |
| | | * Copyright 2007-2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2011 ForgeRock AS |
| | | * Portions copyright 2011-2013 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | |
| | | import org.opends.server.admin.Configuration; |
| | | import org.opends.server.admin.server.ConfigurationAddListener; |
| | | import org.opends.server.admin.server.ConfigurationChangeListener; |
| | | import org.opends.server.admin.server.ConfigurationDeleteListener; |
| | | import org.opends.server.admin.server.ServerManagedObject; |
| | | import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType; |
| | | import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy; |
| | | import org.opends.server.admin.std.server.ExternalChangelogDomainCfg; |
| | |
| | | */ |
| | | public class DomainFakeCfg implements ReplicationDomainCfg |
| | | { |
| | | private DN baseDn; |
| | | private DN baseDN; |
| | | private int serverId; |
| | | private SortedSet<String> replicationServers; |
| | | private long heartbeatInterval = 1000; |
| | | |
| | | // By default changeTimeHeartbeatInterval is set to 0 in order to disable |
| | | // this feature and not kill the tests that expect to receive special |
| | | // messages. |
| | | /** |
| | | * By default changeTimeHeartbeatInterval is set to 0 in order to disable this |
| | | * feature and not kill the tests that expect to receive special messages. |
| | | */ |
| | | private long changeTimeHeartbeatInterval = 0; |
| | | |
| | | private IsolationPolicy policy = IsolationPolicy.REJECT_ALL_UPDATES; |
| | | |
| | | // Is assured mode enabled or not ? |
| | | private boolean assured = false; |
| | | // Assured sub mode (used when assured is true) |
| | | /** Assured sub mode (used when assured is true) */ |
| | | private AssuredType assuredType = AssuredType.NOT_ASSURED; |
| | | // Safe Data level (used when assuredType is safe data) |
| | | /** Safe Data level (used when assuredType is safe data) */ |
| | | private int assuredSdLevel = 1; |
| | | // Timeout (in milliseconds) when waiting for acknowledgments |
| | | /** Timeout (in milliseconds) when waiting for acknowledgments */ |
| | | private long assuredTimeout = 1000; |
| | | // Group id |
| | | /** Group id */ |
| | | private int groupId = 1; |
| | | // Referrals urls to be published to other servers of the topology |
| | | SortedSet<String> refUrls = new TreeSet<String>(); |
| | | /** Referrals urls to be published to other servers of the topology */ |
| | | private SortedSet<String> refUrls = new TreeSet<String>(); |
| | | |
| | | private SortedSet<String> fractionalExcludes = new TreeSet<String>(); |
| | | private SortedSet<String> fractionalIncludes = new TreeSet<String>(); |
| | | |
| | | private ExternalChangelogDomainCfg eclCfg = |
| | | new ExternalChangelogDomainFakeCfg(true, null, null); |
| | | private int windowSize = 100; |
| | | |
| | | /** |
| | | * Creates a new Domain with the provided information |
| | | * (assured mode disabled, default group id) |
| | | */ |
| | | public DomainFakeCfg(DN baseDn, int serverId, SortedSet<String> replServers) |
| | | public DomainFakeCfg(DN baseDN, int serverId, SortedSet<String> replServers) |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.baseDN = baseDN; |
| | | this.serverId = serverId; |
| | | this.replicationServers = replServers; |
| | | } |
| | |
| | | * Creates a new Domain with the provided information |
| | | * (with some fractional configuration provided) |
| | | */ |
| | | public DomainFakeCfg(DN baseDn, int serverId, SortedSet<String> replServers, |
| | | public DomainFakeCfg(DN baseDN, int serverId, SortedSet<String> replServers, |
| | | List<String> fractionalExcludes, List<String> fractionalIncludes) |
| | | { |
| | | this(baseDn, serverId, replServers); |
| | | this(baseDN, serverId, replServers); |
| | | if (fractionalExcludes != null) |
| | | { |
| | | for (String str : fractionalExcludes) |
| | |
| | | * Creates a new Domain with the provided information |
| | | * (assured mode disabled, group id provided) |
| | | */ |
| | | public DomainFakeCfg(DN baseDn, int serverId, SortedSet<String> replServers, |
| | | public DomainFakeCfg(DN baseDN, int serverId, SortedSet<String> replServers, |
| | | int groupId) |
| | | { |
| | | this(baseDn, serverId, replServers); |
| | | this(baseDN, serverId, replServers); |
| | | this.groupId = groupId; |
| | | } |
| | | |
| | |
| | | * Creates a new Domain with the provided information |
| | | * (assured mode info provided as well as group id) |
| | | */ |
| | | public DomainFakeCfg(DN baseDn, int serverId, SortedSet<String> replServers, |
| | | public DomainFakeCfg(DN baseDN, int serverId, SortedSet<String> replServers, |
| | | AssuredType assuredType, int assuredSdLevel, int groupId, |
| | | long assuredTimeout, SortedSet<String> refUrls) |
| | | { |
| | | this(baseDn, serverId, replServers); |
| | | this(baseDN, serverId, replServers); |
| | | switch(assuredType) |
| | | { |
| | | case NOT_ASSURED: |
| | | assured = false; |
| | | break; |
| | | case SAFE_DATA: |
| | | case SAFE_READ: |
| | | assured = true; |
| | | this.assuredType = assuredType; |
| | | break; |
| | | } |
| | |
| | | /** |
| | | * Create a new Domain from the provided arguments. |
| | | * |
| | | * @param string The baseDN in string form. |
| | | * @param baseDN The baseDN in string form. |
| | | * @param serverId The serverID. |
| | | * @param replServer The replication Server that will be used. |
| | | * |
| | | * @throws DirectoryException When the provided string is not a valid DN. |
| | | */ |
| | | public DomainFakeCfg(String string, int serverId, String replServer) |
| | | public DomainFakeCfg(String baseDN, int serverId, String replServer) |
| | | throws DirectoryException |
| | | { |
| | | this.replicationServers = new TreeSet<String>(); |
| | | this.replicationServers.add(replServer); |
| | | this.baseDn = DN.decode(string); |
| | | this.baseDN = DN.decode(baseDN); |
| | | this.serverId = serverId; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void addChangeListener( |
| | | ConfigurationChangeListener<ReplicationDomainCfg> listener) |
| | | { |
| | | |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public Class<? extends ReplicationDomainCfg> configurationClass() |
| | | { |
| | | return null; |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public long getHeartbeatInterval() |
| | | { |
| | | return heartbeatInterval ; |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public long getChangetimeHeartbeatInterval() |
| | | { |
| | | return changeTimeHeartbeatInterval; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public void setChangetimeHeartbeatInterval(long changeTimeHeartbeatInterval) |
| | | { |
| | | this.changeTimeHeartbeatInterval = changeTimeHeartbeatInterval; |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public long getMaxReceiveDelay() |
| | | { |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public int getMaxReceiveQueue() |
| | | { |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public long getMaxSendDelay() |
| | | { |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public int getMaxSendQueue() |
| | | { |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public DN getBaseDN() |
| | | { |
| | | return baseDn; |
| | | return baseDN; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public SortedSet<String> getReplicationServer() |
| | | { |
| | | return replicationServers; |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public int getServerId() |
| | | { |
| | | return serverId; |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public int getWindowSize() |
| | | { |
| | | return 100; |
| | | return this.windowSize; |
| | | } |
| | | |
| | | public void setWindowSize(int windowSize) |
| | | { |
| | | this.windowSize = windowSize; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void removeChangeListener( |
| | | ConfigurationChangeListener<ReplicationDomainCfg> listener) |
| | | { |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public DN dn() |
| | | { |
| | | try |
| | |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public ServerManagedObject<? extends Configuration> managedObject() { |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * Set the heartbeat interval. |
| | | * |
| | | * @param interval |
| | | */ |
| | | public void setHeartbeatInterval(long interval) |
| | | { |
| | |
| | | /** |
| | | * Get the isolation policy. |
| | | */ |
| | | @Override |
| | | public IsolationPolicy getIsolationPolicy() |
| | | { |
| | | return policy; |
| | |
| | | this.policy = policy; |
| | | } |
| | | |
| | | @Override |
| | | public int getAssuredSdLevel() |
| | | { |
| | | return assuredSdLevel; |
| | | } |
| | | |
| | | @Override |
| | | public int getGroupId() |
| | | { |
| | | return groupId; |
| | | } |
| | | |
| | | @Override |
| | | public long getAssuredTimeout() |
| | | { |
| | | return assuredTimeout; |
| | | } |
| | | |
| | | @Override |
| | | public AssuredType getAssuredType() |
| | | { |
| | | return assuredType; |
| | | } |
| | | |
| | | public boolean isAssured() |
| | | { |
| | | return assured; |
| | | } |
| | | |
| | | @Override |
| | | public SortedSet<String> getReferralsUrl() |
| | | { |
| | | return refUrls; |
| | | } |
| | | |
| | | @Override |
| | | public SortedSet<String> getFractionalExclude() |
| | | { |
| | | return fractionalExcludes; |
| | | } |
| | | |
| | | @Override |
| | | public SortedSet<String> getFractionalInclude() |
| | | { |
| | | return fractionalIncludes; |
| | | } |
| | | |
| | | @Override |
| | | public boolean isSolveConflicts() |
| | | { |
| | | return true; |
| | | } |
| | | |
| | | public long getInitializationHeartbeatInterval() |
| | | { |
| | | return 180; |
| | | } |
| | | |
| | | |
| | | @Override |
| | | public int getInitializationWindowSize() |
| | | { |
| | | return 100; |
| | | } |
| | | |
| | | public boolean hasExternalChangelogDomain() { return true; } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Gets the ECL Domain if it is present. |
| | | * |
| | |
| | | * If the ECL Domain does not exist or it could not |
| | | * be successfully decoded. |
| | | */ |
| | | @Override |
| | | public ExternalChangelogDomainCfg getExternalChangelogDomain() |
| | | throws ConfigException |
| | | { return eclCfg; } |
| | |
| | | /** |
| | | * Sets the ECL Domain if it is present. |
| | | * |
| | | * @return Returns the ECL Domain if it is present. |
| | | * @throws ConfigException |
| | | * If the ECL Domain does not exist or it could not |
| | | * be successfully decoded. |
| | |
| | | ConfigurationDeleteListener<ExternalChangelogDomainCfg> listener) |
| | | {} |
| | | |
| | | @Override |
| | | public boolean isLogChangenumber() |
| | | { |
| | | return true; |
| | |
| | | * historical information necessary to solve conflicts. |
| | | * |
| | | * @return Returns the value of the "conflicts-historical-purge-delay" property. |
| | | **/ |
| | | */ |
| | | @Override |
| | | public long getConflictsHistoricalPurgeDelay() |
| | | { |
| | | return 1440; |
| | |
| | | private void createFakeReplicationDomain(boolean firstBackend, |
| | | long generationId) throws Exception |
| | | { |
| | | Set<String> replicationServers = newSet("localhost:" + replServerPort); |
| | | SortedSet<String> replicationServers = newSortedSet("localhost:" + replServerPort); |
| | | |
| | | DN baseDN = DN.decode(firstBackend ? TEST_ROOT_DN_STRING : TEST2_ROOT_DN_STRING); |
| | | replicationDomain = new FakeReplicationDomain(baseDN, DS2_ID, replicationServers, 100, 1000, generationId); |
| | | replicationDomain = new FakeReplicationDomain(baseDN, DS2_ID, replicationServers, 1000, generationId); |
| | | |
| | | // Test connection |
| | | assertTrue(replicationDomain.isConnected()); |
| | |
| | | new LinkedBlockingQueue<UpdateMsg>(); |
| | | |
| | | /** A string that will be exported should exportBackend be called. */ |
| | | private String exportString = null; |
| | | private String exportString; |
| | | |
| | | /** |
| | | * A StringBuilder that will be used to build a new String should the import |
| | | * be called. |
| | | */ |
| | | private StringBuilder importString = null; |
| | | private StringBuilder importString; |
| | | private int exportedEntryCount; |
| | | private long generationID = -1; |
| | | |
| | | public FakeReplicationDomain(DN baseDN, int serverID, |
| | | Set<String> replicationServers, int window, long heartbeatInterval, |
| | | SortedSet<String> replicationServers, long heartbeatInterval, |
| | | long generationId) throws ConfigException |
| | | { |
| | | super(baseDN, serverID, 100); |
| | | generationID = generationId; |
| | | startPublishService(replicationServers, window, heartbeatInterval, 500); |
| | | DomainFakeCfg fakeCfg = new DomainFakeCfg(baseDN, serverID, replicationServers); |
| | | fakeCfg.setHeartbeatInterval(heartbeatInterval); |
| | | fakeCfg.setChangetimeHeartbeatInterval(500); |
| | | startPublishService(fakeCfg); |
| | | startListenService(); |
| | | } |
| | | |
| | |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | |
| | | import org.assertj.core.api.Assertions; |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | |
| | | |
| | | public static class TestBroker extends ReplicationBroker |
| | | { |
| | | List<ReplicationMsg> list = null; |
| | | private List<ReplicationMsg> list; |
| | | |
| | | public TestBroker(List<ReplicationMsg> list) |
| | | { |
| | | super(null, null, null, 0, 0, 0, 0, null, (byte) 0, 0); |
| | | super(null, null, new DomainFakeCfg(null, 0, TestCaseUtils.<String> newSortedSet()), 0, null); |
| | | this.list = list; |
| | | } |
| | | |
| | |
| | | // Read the entry back to get its historical and included CSN |
| | | Entry entry = DirectoryServer.getEntry(dn1); |
| | | List<Attribute> attrs1 = entry.getAttribute(histType); |
| | | |
| | | assertTrue(attrs1 != null); |
| | | assertTrue(attrs1.isEmpty() != true); |
| | | Assertions.assertThat(attrs1).isNotEmpty(); |
| | | |
| | | String histValue = |
| | | attrs1.get(0).iterator().next().getValue().toString(); |
| | |
| | | |
| | | Entry entry2 = DirectoryServer.getEntry(dn1); |
| | | List<Attribute> attrs2 = entry2.getAttribute(histType); |
| | | |
| | | assertTrue(attrs2 != null); |
| | | assertTrue(attrs2.isEmpty() != true); |
| | | Assertions.assertThat(attrs2).isNotEmpty(); |
| | | |
| | | for (AttributeValue av : attrs2.get(0)) { |
| | | logError(Message.raw(Category.SYNC, Severity.INFORMATION, |
| | |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.testng.Assert.*; |
| | |
| | | private ReplicationBroker createReplicationBroker(int dsId, |
| | | ServerState state, long generationId) throws Exception |
| | | { |
| | | SortedSet<String> replServers = newSortedSet("localhost:" + rs1Port); |
| | | DomainFakeCfg fakeCfg = new DomainFakeCfg(EXAMPLE_DN_, dsId, replServers); |
| | | ReplSessionSecurity security = new ReplSessionSecurity(null, null, null, true); |
| | | ReplicationBroker broker = new ReplicationBroker(null, state, EXAMPLE_DN_, |
| | | dsId, 100, generationId, 0, security, (byte) 1, 500); |
| | | broker.start(Collections.singleton("localhost:" + rs1Port)); |
| | | ReplicationBroker broker = new ReplicationBroker(null, state, fakeCfg, generationId, security); |
| | | broker.start(); |
| | | checkConnection(30, broker, rs1Port); |
| | | return broker; |
| | | } |
| | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.admin.std.server.ReplicationDomainCfg; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.ReplicationTestCase; |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.plugin.DomainFakeCfg; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.service.ReplicationDomain; |
| | |
| | | int scenario) |
| | | throws Exception |
| | | { |
| | | return createFakeReplicationDomain(serverId, groupId, rsId, generationId, assured, |
| | | assuredMode, safeDataLevel, assuredTimeout, scenario, new ServerState(), true, 100); |
| | | ReplicationDomainCfg config = newFakeCfg(serverId, getRsPort(rsId)); |
| | | return createFakeReplicationDomain(config, groupId, rsId, generationId, assured, |
| | | assuredMode, safeDataLevel, assuredTimeout, scenario, new ServerState(), true); |
| | | } |
| | | |
| | | private int getRsPort(int rsId) |
| | |
| | | /** |
| | | * Creates a new fake replication domain, using the passed scenario. |
| | | * |
| | | * @param serverId |
| | | * The server ID for the replication domain. |
| | | * @param groupId |
| | | * The group ID for the replication domain. |
| | | * @param rsId |
| | |
| | | * @param startListen |
| | | * If true, we start the listen service. In all cases, the publish |
| | | * service gets started. |
| | | * @param window |
| | | * The window size for replication |
| | | * @return |
| | | * The FakeReplicationDomain, a mock-up of a Replication Domain |
| | | * for tests |
| | | * @throws Exception |
| | | * |
| | | */ |
| | | private FakeReplicationDomain createFakeReplicationDomain(int serverId, |
| | | int groupId, int rsId, long generationId, boolean assured, |
| | | AssuredMode assuredMode, int safeDataLevel, long assuredTimeout, |
| | | int scenario, ServerState serverState, boolean startListen, int window) |
| | | throws Exception |
| | | private FakeReplicationDomain createFakeReplicationDomain( |
| | | ReplicationDomainCfg config, int groupId, int rsId, long generationId, |
| | | boolean assured, AssuredMode assuredMode, int safeDataLevel, |
| | | long assuredTimeout, int scenario, ServerState serverState, |
| | | boolean startListen) throws Exception |
| | | { |
| | | // Set port to right real RS according to its id |
| | | int rsPort = getRsPort(rsId); |
| | | // Set port to right real RS according to its id |
| | | int rsPort = getRsPort(rsId); |
| | | |
| | | FakeReplicationDomain fakeReplicationDomain = new FakeReplicationDomain( |
| | | DN.decode(TEST_ROOT_DN_STRING), serverId, generationId, |
| | | (byte)groupId, assured, assuredMode, (byte)safeDataLevel, assuredTimeout, |
| | | scenario, serverState); |
| | | FakeReplicationDomain fakeReplicationDomain = new FakeReplicationDomain( |
| | | config.getBaseDN(), config.getServerId(), generationId, (byte) groupId, |
| | | assured, assuredMode, (byte) safeDataLevel, assuredTimeout, scenario, serverState); |
| | | |
| | | Set<String> replicationServers = newSet("localhost:" + rsPort); |
| | | fakeReplicationDomain.startPublishService(replicationServers, window, 1000, 500); |
| | | if (startListen) |
| | | fakeReplicationDomain.startListenService(); |
| | | fakeReplicationDomain.startPublishService(config); |
| | | if (startListen) |
| | | fakeReplicationDomain.startListenService(); |
| | | |
| | | // Test connection |
| | | assertTrue(fakeReplicationDomain.isConnected()); |
| | | // Test connection |
| | | assertTrue(fakeReplicationDomain.isConnected()); |
| | | // Check connected server port |
| | | HostPort rd = HostPort.valueOf(fakeReplicationDomain.getReplicationServer()); |
| | | assertEquals(rd.getPort(), rsPort); |
| | | |
| | | return fakeReplicationDomain; |
| | | return fakeReplicationDomain; |
| | | } |
| | | |
| | | private DomainFakeCfg newFakeCfg(int serverId, int rsPort) throws Exception |
| | | { |
| | | DN baseDN = DN.decode(TEST_ROOT_DN_STRING); |
| | | DomainFakeCfg fakeCfg = new DomainFakeCfg(baseDN, serverId, newSortedSet("localhost:" + rsPort)); |
| | | fakeCfg.setHeartbeatInterval(1000); |
| | | fakeCfg.setChangetimeHeartbeatInterval(500); |
| | | return fakeCfg; |
| | | } |
| | | |
| | | /** |
| | |
| | | // Create and connect DS 2 to RS 1 |
| | | // Assured mode: SR |
| | | ServerState serverState = fakeRd1.getServerState(); |
| | | fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID, |
| | | ReplicationDomainCfg config = newFakeCfg(FDS2_ID, getRsPort(RS1_ID)); |
| | | fakeRDs[2] = createFakeReplicationDomain(config, DEFAULT_GID, RS1_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | REPLY_OK_DS_SCENARIO, serverState, true, 100); |
| | | REPLY_OK_DS_SCENARIO, serverState, true); |
| | | |
| | | // Wait for connections to be established |
| | | waitForStableTopo(fakeRd1, 1, 1); |
| | |
| | | TIMEOUT_DS_SCENARIO); |
| | | |
| | | // DS 2 connected to RS 1 with low window to easily put it in DEGRADED status |
| | | fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID, |
| | | DomainFakeCfg config = newFakeCfg(FDS2_ID, getRsPort(RS1_ID)); |
| | | config.setWindowSize(2); |
| | | fakeRDs[2] = createFakeReplicationDomain(config, DEFAULT_GID, RS1_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | REPLY_OK_DS_SCENARIO, new ServerState(), false, 2); |
| | | REPLY_OK_DS_SCENARIO, new ServerState(), false); |
| | | |
| | | // Wait for connections to be finished |
| | | // DS must see expected numbers of DSs/RSs |
| | |
| | | try |
| | | { |
| | | // Create broker on o=test |
| | | server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, |
| | | 100, replicationServerPort, brokerSessionTimeout, true); |
| | | server01.setChangeTimeHeartbeatInterval(100); //ms |
| | | DomainFakeCfg config1 = newFakeCfg(TEST_ROOT_DN, SERVER_ID_1, replicationServerPort); |
| | | config1.setChangetimeHeartbeatInterval(100); // ms |
| | | server01 = openReplicationSession(config1, replicationServerPort, |
| | | brokerSessionTimeout, true, getGenerationId(TEST_ROOT_DN), null); |
| | | |
| | | // Create broker on o=test2 |
| | | server02 = openReplicationSession(TEST_ROOT_DN2, SERVER_ID_2, |
| | | 100, replicationServerPort, brokerSessionTimeout, true, EMPTY_DN_GENID); |
| | | server02.setChangeTimeHeartbeatInterval(100); //ms |
| | | DomainFakeCfg config2 = newFakeCfg(TEST_ROOT_DN2, SERVER_ID_2, replicationServerPort); |
| | | config2.setChangetimeHeartbeatInterval(100); //ms |
| | | server02 = openReplicationSession(config2, replicationServerPort, |
| | | brokerSessionTimeout, true, EMPTY_DN_GENID, null); |
| | | |
| | | int ts = 1; |
| | | // Produce update 1 |
| | |
| | | |
| | | // Connect to the replicationServer using the state created above. |
| | | try { |
| | | broker = openReplicationSession(TEST_ROOT_DN, |
| | | 3, 100, replicationServerPort, 5000, state); |
| | | broker = new ReplicationBroker(null, state, newFakeCfg(TEST_ROOT_DN, 3, replicationServerPort), |
| | | getGenerationId(TEST_ROOT_DN), getReplSessionSecurity()); |
| | | connect(broker, replicationServerPort, 5000); |
| | | |
| | | ReplicationMsg receivedMsg = broker.receive(); |
| | | broker.updateWindowAfterReplay(); |
| | |
| | | import java.io.IOException; |
| | | import java.io.InputStream; |
| | | import java.io.OutputStream; |
| | | import java.util.Set; |
| | | import java.util.SortedSet; |
| | | import java.util.concurrent.BlockingQueue; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.replication.plugin.DomainFakeCfg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | |
| | | |
| | | private long generationID = 1; |
| | | |
| | | public FakeReplicationDomain(DN baseDN, int serverID, |
| | | Set<String> replicationServers, int window, long heartbeatInterval, |
| | | BlockingQueue<UpdateMsg> queue) throws ConfigException |
| | | private FakeReplicationDomain(DN baseDN, int serverID, |
| | | SortedSet<String> replicationServers, long heartbeatInterval) |
| | | throws ConfigException |
| | | { |
| | | super(baseDN, serverID, 100); |
| | | startPublishService(replicationServers, window, heartbeatInterval, 500); |
| | | DomainFakeCfg fakeCfg = new DomainFakeCfg(baseDN, serverID, replicationServers); |
| | | fakeCfg.setHeartbeatInterval(heartbeatInterval); |
| | | fakeCfg.setChangetimeHeartbeatInterval(500); |
| | | startPublishService(fakeCfg); |
| | | startListenService(); |
| | | } |
| | | |
| | | public FakeReplicationDomain(DN baseDN, int serverID, |
| | | SortedSet<String> replicationServers, long heartbeatInterval, |
| | | BlockingQueue<UpdateMsg> queue) throws ConfigException |
| | | { |
| | | this(baseDN, serverID, replicationServers, heartbeatInterval); |
| | | this.queue = queue; |
| | | } |
| | | |
| | | public FakeReplicationDomain(DN baseDN, int serverID, |
| | | Set<String> replicationServers, int window, long heartbeatInterval, |
| | | SortedSet<String> replicationServers, long heartbeatInterval, |
| | | String exportString, StringBuilder importString, int exportedEntryCount) |
| | | throws ConfigException |
| | | { |
| | | super(baseDN, serverID, 100); |
| | | startPublishService(replicationServers, window, heartbeatInterval, 500); |
| | | startListenService(); |
| | | this(baseDN, serverID, replicationServers, heartbeatInterval); |
| | | this.exportString = exportString; |
| | | this.importString = importString; |
| | | this.exportedEntryCount = exportedEntryCount; |
| | |
| | | import java.io.IOException; |
| | | import java.io.InputStream; |
| | | import java.io.OutputStream; |
| | | import java.util.Set; |
| | | import java.util.SortedSet; |
| | | import java.util.concurrent.BlockingQueue; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.replication.plugin.DomainFakeCfg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | |
| | | * A blocking queue that is used to send the UpdateMsg received from the |
| | | * Replication Service. |
| | | */ |
| | | private BlockingQueue<UpdateMsg> queue = null; |
| | | private BlockingQueue<UpdateMsg> queue; |
| | | |
| | | public FakeStressReplicationDomain(DN baseDN, int serverID, |
| | | Set<String> replicationServers, int window, long heartbeatInterval, |
| | | SortedSet<String> replicationServers, long heartbeatInterval, |
| | | BlockingQueue<UpdateMsg> queue) throws ConfigException |
| | | { |
| | | super(baseDN, serverID, 100); |
| | | startPublishService(replicationServers, window, heartbeatInterval, 500); |
| | | final DomainFakeCfg fakeCfg = |
| | | new DomainFakeCfg(baseDN, serverID, replicationServers); |
| | | fakeCfg.setHeartbeatInterval(heartbeatInterval); |
| | | fakeCfg.setChangetimeHeartbeatInterval(500); |
| | | startPublishService(fakeCfg); |
| | | startListenService(); |
| | | this.queue = queue; |
| | | } |
| | |
| | | replServer2 = createReplicationServer(replServerID2, replServerPort2, |
| | | "ReplicationDomainTestDb2", 100, "localhost:" + replServerPort1); |
| | | |
| | | Set<String> servers = newSet("localhost:" + replServerPort1); |
| | | |
| | | SortedSet<String> servers = newSortedSet("localhost:" + replServerPort1); |
| | | BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>(); |
| | | domain1 = new FakeReplicationDomain( |
| | | testService, domain1ServerId, servers, 100, 1000, rcvQueue1); |
| | | testService, domain1ServerId, servers, 1000, rcvQueue1); |
| | | |
| | | Set<String> servers2 = newSet("localhost:" + replServerPort2); |
| | | SortedSet<String> servers2 = newSortedSet("localhost:" + replServerPort2); |
| | | BlockingQueue<UpdateMsg> rcvQueue2 = new LinkedBlockingQueue<UpdateMsg>(); |
| | | domain2 = new FakeReplicationDomain( |
| | | testService, domain2ServerId, servers2, 100, 1000, rcvQueue2); |
| | | testService, domain2ServerId, servers2, 1000, rcvQueue2); |
| | | |
| | | Thread.sleep(500); |
| | | |
| | |
| | | replServer1 = createReplicationServer(replServerID1, replServerPort, |
| | | "ReplicationDomainTestDb", 100000, "localhost:" + replServerPort); |
| | | |
| | | Set<String> servers = newSet("localhost:" + replServerPort); |
| | | SortedSet<String> servers = newSortedSet("localhost:" + replServerPort); |
| | | BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>(); |
| | | domain1 = new FakeReplicationDomain( |
| | | testService, domain1ServerId, servers, 1000, 100000, rcvQueue1); |
| | | testService, domain1ServerId, servers, 100000, rcvQueue1); |
| | | |
| | | |
| | | /* |
| | |
| | | |
| | | replServer = createReplicationServer(replServerID, replServerPort, |
| | | "exportAndImportData", 100); |
| | | Set<String> servers = newSet("localhost:" + replServerPort); |
| | | SortedSet<String> servers = newSortedSet("localhost:" + replServerPort); |
| | | |
| | | StringBuilder exportedDataBuilder = new StringBuilder(); |
| | | for (int i =0; i<ENTRYCOUNT; i++) |
| | |
| | | } |
| | | String exportedData=exportedDataBuilder.toString(); |
| | | domain1 = new FakeReplicationDomain( |
| | | testService, serverId1, servers, |
| | | 100, 0, exportedData, null, ENTRYCOUNT); |
| | | testService, serverId1, servers, 0, exportedData, null, ENTRYCOUNT); |
| | | |
| | | StringBuilder importedData = new StringBuilder(); |
| | | domain2 = new FakeReplicationDomain( |
| | | testService, serverId2, servers, 100, 0, |
| | | null, importedData, 0); |
| | | testService, serverId2, servers, 0, null, importedData, 0); |
| | | |
| | | /* |
| | | * Trigger a total update from domain1 to domain2. |
| | |
| | | replServer2 = createReplicationServer(replServerID2, replServerPort2, |
| | | "exportAndImportservice2", 100, "localhost:" + replServerPort1); |
| | | |
| | | Set<String> servers1 = newSet("localhost:" + replServerPort1); |
| | | Set<String> servers2 = newSet("localhost:" + replServerPort2); |
| | | SortedSet<String> servers1 = newSortedSet("localhost:" + replServerPort1); |
| | | SortedSet<String> servers2 = newSortedSet("localhost:" + replServerPort2); |
| | | |
| | | StringBuilder exportedDataBuilder = new StringBuilder(); |
| | | for (int i =0; i<ENTRYCOUNT; i++) |
| | |
| | | } |
| | | String exportedData=exportedDataBuilder.toString(); |
| | | domain1 = new FakeReplicationDomain( |
| | | testService, 1, servers1, |
| | | 100, 0, exportedData, null, ENTRYCOUNT); |
| | | testService, 1, servers1, 0, exportedData, null, ENTRYCOUNT); |
| | | |
| | | StringBuilder importedData = new StringBuilder(); |
| | | domain2 = new FakeReplicationDomain( |
| | | testService, 2, servers2, 100, 0, |
| | | null, importedData, 0); |
| | | testService, 2, servers2, 0, null, importedData, 0); |
| | | |
| | | domain2.initializeFromRemote(1); |
| | | |
| | |
| | | |
| | | try |
| | | { |
| | | SortedSet<String> servers = new TreeSet<String>(); |
| | | servers.add(HOST1 + SENDERPORT); |
| | | servers.add(HOST2 + RECEIVERPORT); |
| | | SortedSet<String> servers = |
| | | newSortedSet(HOST1 + SENDERPORT, HOST2 + RECEIVERPORT); |
| | | |
| | | replServer = createReplicationServer(replServerID, SENDERPORT, |
| | | "ReplicationDomainTestDb", 100, servers); |
| | | |
| | | BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>(); |
| | | domain1 = new FakeStressReplicationDomain( |
| | | testService, 2, servers, 100, 1000, rcvQueue1); |
| | | testService, 2, servers, 1000, rcvQueue1); |
| | | |
| | | System.out.println("waiting"); |
| | | Thread.sleep(1000000000); |
| | |
| | | |
| | | try |
| | | { |
| | | SortedSet<String> servers = new TreeSet<String>(); |
| | | servers.add(HOST1 + SENDERPORT); |
| | | servers.add(HOST2 + RECEIVERPORT); |
| | | SortedSet<String> servers = |
| | | newSortedSet(HOST1 + SENDERPORT, HOST2 + RECEIVERPORT); |
| | | |
| | | replServer = createReplicationServer(replServerID, RECEIVERPORT, |
| | | "ReplicationDomainTestDb", 100, servers); |
| | | |
| | | BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>(); |
| | | domain1 = new FakeStressReplicationDomain( |
| | | testService, 1, servers, 100, 100000, rcvQueue1); |
| | | testService, 1, servers, 100000, rcvQueue1); |
| | | /* |
| | | * Trigger a total update from domain1 to domain2. |
| | | * Check that the exported data is correctly received on domain2. |