| | |
| | | import java.net.InetAddress; |
| | | import java.net.InetSocketAddress; |
| | | import java.net.Socket; |
| | | import java.net.SocketException; |
| | | import java.net.SocketTimeoutException; |
| | | |
| | | import org.opends.server.changelog.ProtocolSession; |
| | | import org.opends.server.changelog.SocketSession; |
| | |
| | | import org.opends.server.protocols.internal.InternalSearchListener; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DereferencePolicy; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | |
| | | { |
| | | private boolean shutdown = false; |
| | | private List<String> servers; |
| | | private Short identifier; |
| | | private boolean connected = false; |
| | | private SynchronizationDomain domain; |
| | | private final Object lock = new Object(); |
| | | private String changelogServer = "Not connected"; |
| | | private TreeSet<FakeOperation> replayOperations; |
| | | private ProtocolSession session = null; |
| | | private final ServerState state; |
| | | private final DN baseDn; |
| | | private final short serverID; |
| | | private int maxSendDelay; |
| | | private int maxReceiveDelay; |
| | | private int maxSendQueue; |
| | | private int maxReceiveQueue; |
| | | |
| | | /** |
| | | * Creates a new Changelog Broker for a particular SynchronizationDomain. |
| | | * |
| | | * @param domain The SynchronizationDomain for which the borker is created. |
| | | * @param state The ServerState that should be used by this broker |
| | | * when negociating the session with the changelog servers. |
| | | * @param baseDn The base DN that should be used by this broker |
| | | * when negociating the session with the changelog servers. |
| | | * @param serverID The server ID that should be used by this broker |
| | | * when negociating the session with the changelog servers. |
| | | * @param maxReceiveQueue The maximum size of the receive queue to use on |
| | | * the changelog server. |
| | | * @param maxReceiveDelay The maximum replication delay to use on the |
| | | * changelog server. |
| | | * @param maxSendQueue The maximum size of the send queue to use on |
| | | * the changelog server. |
| | | * @param maxSendDelay The maximum send delay to use on the changelog server. |
| | | */ |
| | | public ChangelogBroker(SynchronizationDomain domain) |
| | | public ChangelogBroker(ServerState state, DN baseDn, short serverID, |
| | | int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, |
| | | int maxSendDelay ) |
| | | { |
| | | this.domain = domain; |
| | | this.baseDn = baseDn; |
| | | this.serverID = serverID; |
| | | this.maxReceiveDelay = maxReceiveDelay; |
| | | this.maxSendDelay = maxSendDelay; |
| | | this.maxReceiveQueue = maxReceiveQueue; |
| | | this.maxSendQueue = maxSendQueue; |
| | | this.state = state; |
| | | replayOperations = |
| | | new TreeSet<FakeOperation>(new FakeOperationComparator()); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Start the ChangelogBroker. |
| | | * |
| | | * @param identifier identifier of the changelog |
| | | * @param servers list of servers used |
| | | * @throws Exception : in case of errors |
| | | */ |
| | | public void start(Short identifier, |
| | | List<String> servers) |
| | | public void start(List<String> servers) |
| | | throws Exception |
| | | { |
| | | /* |
| | | * Open Socket to the Changelog |
| | | * Send the Start message |
| | | */ |
| | | this.identifier = identifier; |
| | | this.servers = servers; |
| | | if (servers.size() < 1) |
| | | { |
| | |
| | | /* |
| | | * Send our ServerStartMessage. |
| | | */ |
| | | ServerStartMessage msg = domain.newServerStartMessage(); |
| | | ServerStartMessage msg = new ServerStartMessage( serverID, baseDn, |
| | | maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, |
| | | state); |
| | | session.publish(msg); |
| | | |
| | | |
| | |
| | | * those changes and send them again to any changelog server. |
| | | */ |
| | | ChangeNumber changelogMaxChangeNumber = |
| | | startMsg.getServerState().getMaxChangeNumber(identifier); |
| | | startMsg.getServerState().getMaxChangeNumber(serverID); |
| | | if (changelogMaxChangeNumber == null) |
| | | changelogMaxChangeNumber = new ChangeNumber(0, 0, identifier); |
| | | ChangeNumber ourMaxChangeNumber = domain.getMaxChangeNumber(); |
| | | changelogMaxChangeNumber = new ChangeNumber(0, 0, serverID); |
| | | ChangeNumber ourMaxChangeNumber = state.getMaxChangeNumber(serverID); |
| | | if ((ourMaxChangeNumber == null) || |
| | | (ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber))) |
| | | { |
| | |
| | | LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); |
| | | attrs.add(Historical.HISTORICALATTRIBUTENAME); |
| | | InternalSearchOperation op = conn.processSearch( |
| | | new ASN1OctetString(domain.getBaseDN().toString()), |
| | | new ASN1OctetString(baseDn.toString()), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | DereferencePolicy.NEVER_DEREF_ALIASES, |
| | | 0, 0, false, filter, |
| | |
| | | /** |
| | | * Receive a message. |
| | | * @return the received message |
| | | * @throws SocketTimeoutException if the tiemout set by setSoTimeout |
| | | * has expired |
| | | */ |
| | | public SynchronizationMessage receive() |
| | | public SynchronizationMessage receive() throws SocketTimeoutException |
| | | { |
| | | while (shutdown == false) |
| | | { |
| | |
| | | return session.receive(); |
| | | } catch (Exception e) |
| | | { |
| | | if (e instanceof SocketTimeoutException) |
| | | { |
| | | SocketTimeoutException e1 = (SocketTimeoutException) e; |
| | | throw e1; |
| | | } |
| | | if (shutdown == false) |
| | | { |
| | | synchronized (lock) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Set a timeout value. |
| | | * With this option set to a non-zero value, calls to the receive() method |
| | | * block for only this amount of time after which a |
| | | * java.net.SocketTimeoutException is raised. |
| | | * The Broker is valid and useable even after such an Exception is raised. |
| | | * |
| | | * @param timeout the specified timeout, in milliseconds. |
| | | * @throws SocketException if there is an error in the underlying protocol, |
| | | * such as a TCP error. |
| | | */ |
| | | public void setSoTimeout(int timeout) throws SocketException |
| | | { |
| | | session.setSoTimeout(timeout); |
| | | } |
| | | |
| | | /** |
| | | * Get the name of the changelog server to which this broker is currently |
| | | * connected. |
| | | * |