| | |
| | | import java.util.Set; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.TimeoutException; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.zip.CheckedOutputStream; |
| | | import java.util.zip.DataFormatException; |
| | |
| | | configDn = configuration.dn(); |
| | | this.updateToReplayQueue = updateToReplayQueue; |
| | | |
| | | /* |
| | | * Fill assured configuration properties |
| | | */ |
| | | AssuredType assuredType = configuration.getAssuredType(); |
| | | switch (assuredType) |
| | | { |
| | | case NOT_ASSURED: |
| | | setAssured(false); |
| | | break; |
| | | case SAFE_DATA: |
| | | setAssured(true); |
| | | setAssuredMode(AssuredMode.SAFE_DATA_MODE); |
| | | break; |
| | | case SAFE_READ: |
| | | setAssured(true); |
| | | setAssuredMode(AssuredMode.SAFE_READ_MODE); |
| | | break; |
| | | } |
| | | setAssuredSdLevel((byte)configuration.getAssuredSdLevel()); |
| | | setAssuredTimeout(configuration.getAssuredTimeout()); |
| | | // Get assured configuration |
| | | readAssuredConfig(configuration); |
| | | |
| | | setGroupId((byte)configuration.getGroupId()); |
| | | setURLs(configuration.getReferralsUrl()); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Gets and stores the assured replication configuration parameters. Returns |
| | | * a boolean indicating if the passed configuration has changed compared to |
| | | * previous values and the changes require a reconnection. |
| | | * @param configuration The configuration object |
| | | * @return True if the assured configuration changed and we need to reconnect |
| | | */ |
| | | private boolean readAssuredConfig(ReplicationDomainCfg configuration) |
| | | { |
| | | boolean needReconnect = false; |
| | | |
| | | byte newSdLevel = (byte) configuration.getAssuredSdLevel(); |
| | | if ((isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)) && |
| | | (newSdLevel != getAssuredSdLevel())) |
| | | { |
| | | needReconnect = true; |
| | | } |
| | | |
| | | AssuredType newAssuredType = configuration.getAssuredType(); |
| | | switch (newAssuredType) |
| | | { |
| | | case NOT_ASSURED: |
| | | if (isAssured()) |
| | | { |
| | | needReconnect = true; |
| | | } |
| | | break; |
| | | case SAFE_DATA: |
| | | if (!isAssured() || |
| | | (isAssured() && (getAssuredMode() == AssuredMode.SAFE_READ_MODE))) |
| | | { |
| | | needReconnect = true; |
| | | } |
| | | break; |
| | | case SAFE_READ: |
| | | if (!isAssured() || |
| | | (isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE))) |
| | | { |
| | | needReconnect = true; |
| | | } |
| | | break; |
| | | } |
| | | |
| | | switch (newAssuredType) |
| | | { |
| | | case NOT_ASSURED: |
| | | setAssured(false); |
| | | break; |
| | | case SAFE_DATA: |
| | | setAssured(true); |
| | | setAssuredMode(AssuredMode.SAFE_DATA_MODE); |
| | | break; |
| | | case SAFE_READ: |
| | | setAssured(true); |
| | | setAssuredMode(AssuredMode.SAFE_READ_MODE); |
| | | break; |
| | | } |
| | | setAssuredSdLevel(newSdLevel); |
| | | |
| | | // Changing timeout does not require restart as it is not sent in |
| | | // StartSessionMsg |
| | | setAssuredTimeout(configuration.getAssuredTimeout()); |
| | | |
| | | return needReconnect; |
| | | } |
| | | |
| | | /** |
| | | * Returns the base DN of this ReplicationDomain. |
| | | * |
| | | * @return The base DN of this ReplicationDomain |
| | |
| | | |
| | | if (!op.isSynchronizationOperation()) |
| | | { |
| | | // If assured replication is configured, this will prepare blocking |
| | | // mechanism. If assured replication is disabled, this returns |
| | | // immediately |
| | | prepareWaitForAckIfAssuredEnabled(msg); |
| | | |
| | | pendingChanges.pushCommittedChanges(); |
| | | |
| | | // If assured replication is enabled, this will wait for the matching |
| | | // ack or time out. If assured replication is disabled, this returns |
| | | // immediately |
| | | try |
| | | { |
| | | waitForAckIfAssuredEnabled(msg); |
| | | } catch (TimeoutException ex) |
| | | { |
| | | // This exception may only be raised if assured replication is |
| | | // enabled |
| | | Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getServiceID(), |
| | | Long.toString(getAssuredTimeout()), msg.toString()); |
| | | logError(errorMsg); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | changeConfig( |
| | | configuration.getReplicationServer(), |
| | | configuration.getWindowSize(), |
| | | configuration.getHeartbeatInterval()); |
| | | configuration.getHeartbeatInterval(), |
| | | (byte)configuration.getGroupId()); |
| | | |
| | | // Get assured configuration |
| | | boolean needReconnect = readAssuredConfig(configuration); |
| | | |
| | | // Reconnect if required |
| | | if (needReconnect) |
| | | { |
| | | disableService(); |
| | | enableService(); |
| | | } |
| | | |
| | | return new ConfigChangeResult(ResultCode.SUCCESS, false); |
| | | } |