| File was renamed from opends/src/server/org/opends/server/replication/plugin/ChangelogBroker.java |
| | |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.ChangelogStartMessage; |
| | | import org.opends.server.replication.protocol.ReplServerStartMessage; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | | import org.opends.server.replication.protocol.ServerStartMessage; |
| | | import org.opends.server.replication.protocol.SocketSession; |
| | |
| | | /** |
| | | * The broker for Multi-master Replication. |
| | | */ |
| | | public class ChangelogBroker implements InternalSearchListener |
| | | public class ReplicationBroker implements InternalSearchListener |
| | | { |
| | | private boolean shutdown = false; |
| | | private Collection<String> servers; |
| | | private boolean connected = false; |
| | | private final Object lock = new Object(); |
| | | private String changelogServer = "Not connected"; |
| | | private String replicationServer = "Not connected"; |
| | | private TreeSet<FakeOperation> replayOperations; |
| | | private ProtocolSession session = null; |
| | | private final ServerState state; |
| | |
| | | |
| | | |
| | | /** |
| | | * Creates a new Changelog Broker for a particular ReplicationDomain. |
| | | * Creates a new ReplicationServer Broker for a particular ReplicationDomain. |
| | | * |
| | | * @param state The ServerState that should be used by this broker |
| | | * when negociating the session with the changelog servers. |
| | | * when negociating the session with the replicationServer. |
| | | * @param baseDn The base DN that should be used by this broker |
| | | * when negociating the session with the changelog servers. |
| | | * when negociating the session with the replicationServer. |
| | | * @param serverID The server ID that should be used by this broker |
| | | * when negociating the session with the changelog servers. |
| | | * when negociating the session with the replicationServer. |
| | | * @param maxReceiveQueue The maximum size of the receive queue to use on |
| | | * the changelog server. |
| | | * the replicationServer. |
| | | * @param maxReceiveDelay The maximum replication delay to use on the |
| | | * changelog server. |
| | | * replicationServer. |
| | | * @param maxSendQueue The maximum size of the send queue to use on |
| | | * the changelog server. |
| | | * @param maxSendDelay The maximum send delay to use on the changelog server. |
| | | * the replicationServer. |
| | | * @param maxSendDelay The maximum send delay to use on the replicationServer. |
| | | * @param window The size of the send and receive window to use. |
| | | * @param heartbeatInterval The interval between heartbeats requested of the |
| | | * changelog server, or zero if no heartbeats are requested. |
| | | * replicationServer, or zero if no heartbeats are requested. |
| | | */ |
| | | public ChangelogBroker(ServerState state, DN baseDn, short serverID, |
| | | public ReplicationBroker(ServerState state, DN baseDn, short serverID, |
| | | int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, |
| | | int maxSendDelay, int window, long heartbeatInterval) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Start the ChangelogBroker. |
| | | * Start the ReplicationBroker. |
| | | * |
| | | * @param servers list of servers used |
| | | * @throws Exception : in case of errors |
| | |
| | | throws Exception |
| | | { |
| | | /* |
| | | * Open Socket to the Changelog |
| | | * Open Socket to the ReplicationServer |
| | | * Send the Start message |
| | | */ |
| | | shutdown = false; |
| | |
| | | |
| | | |
| | | /** |
| | | * Connect to a Changelog server. |
| | | * Connect to a ReplicationServer. |
| | | * |
| | | * @throws NumberFormatException address was invalid |
| | | * @throws IOException error during connection phase |
| | | */ |
| | | private void connect() throws NumberFormatException, IOException |
| | | { |
| | | ChangelogStartMessage startMsg; |
| | | ReplServerStartMessage startMsg; |
| | | |
| | | // Stop any existing heartbeat monitor from a previous session. |
| | | if (heartbeatMonitor != null) |
| | |
| | | |
| | | |
| | | /* |
| | | * Read the ChangelogStartMessage that should come back. |
| | | * Read the ReplServerStartMessage that should come back. |
| | | */ |
| | | session.setSoTimeout(1000); |
| | | startMsg = (ChangelogStartMessage) session.receive(); |
| | | startMsg = (ReplServerStartMessage) session.receive(); |
| | | session.setSoTimeout(timeout); |
| | | |
| | | /* |
| | | * We must not publish changes to a changelog that has not |
| | | * We must not publish changes to a replicationServer that has not |
| | | * seen all our previous changes because this could cause some |
| | | * other ldap servers to miss those changes. |
| | | * Check that the Changelog has seen all our previous changes. |
| | | * If not, try another changelog server. |
| | | * If no other changelog server has seen all our changes, recover |
| | | * those changes and send them again to any changelog server. |
| | | * Check that the ReplicationServer has seen all our previous changes. |
| | | * If not, try another replicationServer. |
| | | * If no other replicationServer has seen all our changes, recover |
| | | * those changes and send them again to any replicationServer. |
| | | */ |
| | | ChangeNumber changelogMaxChangeNumber = |
| | | ChangeNumber replServerMaxChangeNumber = |
| | | startMsg.getServerState().getMaxChangeNumber(serverID); |
| | | if (changelogMaxChangeNumber == null) |
| | | changelogMaxChangeNumber = new ChangeNumber(0, 0, serverID); |
| | | if (replServerMaxChangeNumber == null) |
| | | replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID); |
| | | ChangeNumber ourMaxChangeNumber = state.getMaxChangeNumber(serverID); |
| | | if ((ourMaxChangeNumber == null) || |
| | | (ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber))) |
| | | (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) |
| | | { |
| | | changelogServer = ServerAddr.toString(); |
| | | replicationServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | this.sendWindow = new Semaphore(maxSendWindow); |
| | | connected = true; |
| | |
| | | { |
| | | if (checkState == true) |
| | | { |
| | | /* This changelog server is missing some |
| | | /* This replicationServer is missing some |
| | | * of our changes, we are going to try another server |
| | | * but before log a notice message |
| | | */ |
| | |
| | | { |
| | | replayOperations.clear(); |
| | | /* |
| | | * Get all the changes that have not been seen by this changelog |
| | | * server and update it |
| | | * Get all the changes that have not been seen by this |
| | | * replicationServer and update it |
| | | */ |
| | | InternalClientConnection conn = |
| | | InternalClientConnection.getRootConnection(); |
| | | LDAPFilter filter = LDAPFilter.decode( |
| | | "("+ Historical.HISTORICALATTRIBUTENAME + |
| | | ">=dummy:" + changelogMaxChangeNumber + ")"); |
| | | ">=dummy:" + replServerMaxChangeNumber + ")"); |
| | | LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); |
| | | attrs.add(Historical.HISTORICALATTRIBUTENAME); |
| | | InternalSearchOperation op = conn.processSearch( |
| | |
| | | } |
| | | else |
| | | { |
| | | changelogServer = ServerAddr.toString(); |
| | | replicationServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | this.sendWindow = new Semaphore(maxSendWindow); |
| | | connected = true; |
| | |
| | | { |
| | | /* |
| | | * There was no server waiting on this host:port |
| | | * Log a notice and try the next changelog server in the list |
| | | * Log a notice and try the next replicationServer in the list |
| | | */ |
| | | int msgID = MSGID_NO_CHANGELOG_SERVER_LISTENING; |
| | | String message = getMessage(msgID, server); |
| | |
| | | if (checkState == true) |
| | | { |
| | | /* |
| | | * We could not find a changelog server that has seen all the |
| | | * We could not find a replicationServer that has seen all the |
| | | * changes that this server has already processed, start again |
| | | * the loop looking for any changelog server. |
| | | * the loop looking for any replicationServer. |
| | | */ |
| | | try |
| | | { |
| | |
| | | else |
| | | { |
| | | /* |
| | | * This server could not find any changelog server |
| | | * This server could not find any replicationServer |
| | | * Let's wait a little and try again. |
| | | */ |
| | | synchronized (this) |
| | |
| | | |
| | | |
| | | /** |
| | | * Restart the Changelog broker after a failure. |
| | | * Restart the ReplicationServer broker after a failure. |
| | | * |
| | | * @param failingSession the socket which failed |
| | | */ |
| | |
| | | */ |
| | | public void stop() |
| | | { |
| | | changelogServer = "stopped"; |
| | | replicationServer = "stopped"; |
| | | shutdown = true; |
| | | connected = false; |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("ChangelogBroker Stop Closing session"); |
| | | debugInfo("ReplicationBroker Stop Closing session"); |
| | | } |
| | | |
| | | if (session != null) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the name of the changelog server to which this broker is currently |
| | | * Get the name of the replicationServer to which this broker is currently |
| | | * connected. |
| | | * |
| | | * @return the name of the changelog server to which this domain |
| | | * @return the name of the replicationServer to which this domain |
| | | * is currently connected. |
| | | */ |
| | | public String getChangelogServer() |
| | | public String getReplicationServer() |
| | | { |
| | | return changelogServer; |
| | | return replicationServer; |
| | | } |
| | | /** |
| | | * {@inheritDoc} |
| | |
| | | * TODO : implement code for ADD, DEL, MODDN operation |
| | | * |
| | | * Parse all ds-sync-hist attribute values |
| | | * - for each Changenumber>changelogMaxChangeNumber : |
| | | * - for each Changenumber > replication server MaxChangeNumber : |
| | | * build an attribute mod |
| | | * |
| | | */ |
| | |
| | | /** |
| | | * Change some config parameters. |
| | | * |
| | | * @param changelogServers The new list of changelog servers. |
| | | * @param replicationServers The new list of replication servers. |
| | | * @param maxReceiveQueue The max size of receive queue. |
| | | * @param maxReceiveDelay The max receive delay. |
| | | * @param maxSendQueue The max send queue. |
| | |
| | | * @param window The max window size. |
| | | * @param heartbeatInterval The heartbeat interval. |
| | | */ |
| | | public void changeConfig(Collection<String> changelogServers, |
| | | public void changeConfig(Collection<String> replicationServers, |
| | | int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, |
| | | int maxSendDelay, int window, long heartbeatInterval) |
| | | { |
| | | this.servers = changelogServers; |
| | | this.servers = replicationServers; |
| | | this.maxRcvWindow = window; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.maxReceiveDelay = maxReceiveDelay; |
| | |
| | | this.maxSendDelay = maxSendDelay; |
| | | this.maxSendQueue = maxSendQueue; |
| | | // TODO : Changing those parameters requires to either restart a new |
| | | // session with the changelog server or renegociate the parameters that |
| | | // session with the replicationServer or renegociate the parameters that |
| | | // were sent in the ServerStart message |
| | | } |
| | | } |