| | |
| | | */ |
| | | package org.opends.server.synchronization.changelog; |
| | | |
| | | import static org.opends.server.loggers.Error.logError; |
| | | import static org.opends.server.messages.MessageHandler.getMessage; |
| | | import static org.opends.server.synchronization.common.LogMessages.*; |
| | | import static org.opends.server.loggers.Error.logError; |
| | | |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | | import org.opends.server.synchronization.common.ChangeNumber; |
| | | import org.opends.server.synchronization.common.ServerState; |
| | | import org.opends.server.synchronization.protocol.AckMessage; |
| | | import org.opends.server.synchronization.protocol.UpdateMessage; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | |
| | | import org.opends.server.synchronization.common.ChangeNumber; |
| | | import org.opends.server.synchronization.common.ServerState; |
| | | import org.opends.server.synchronization.protocol.AckMessage; |
| | | import org.opends.server.synchronization.protocol.InitializeRequestMessage; |
| | | import org.opends.server.synchronization.protocol.ErrorMessage; |
| | | import org.opends.server.synchronization.protocol.RoutableMessage; |
| | | import org.opends.server.synchronization.protocol.UpdateMessage; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | |
| | | /** |
| | | * This class define an in-memory cache that will be used to store |
| | | * the messages that have been received from an LDAP server or |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Send back an ack to the server that sent the change. |
| | | * Retrieves the destination handlers for a routable message. |
| | | * |
| | | * @param changeNumber The ChangeNumber of the change that must be acked. |
| | | * @param isLDAPserver This boolean indicates if the server that sent the |
| | | * change was an LDAP server or a Changelog server. |
| | | * @param msg The message to route. |
| | | * @param senderHandler The handler of the server that published this message. |
| | | * @return The list of destination handlers. |
| | | */ |
| | | public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver) |
| | | protected List<ServerHandler> getDestinationServers(RoutableMessage msg, |
| | | ServerHandler senderHandler) |
| | | { |
| | | short serverId = changeNumber.getServerId(); |
| | | sendAck(changeNumber, isLDAPserver, serverId); |
| | | } |
| | | |
| | | /** |
| | | * |
| | | * Send back an ack to a server that sent the change. |
| | | * |
| | | * @param changeNumber The ChangeNumber of the change that must be acked. |
| | | * @param isLDAPserver This boolean indicates if the server that sent the |
| | | * change was an LDAP server or a Changelog server. |
| | | * @param serverId The identifier of the server from which we |
| | | * received the change.. |
| | | */ |
| | | public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver, |
| | | short serverId) |
| | | { |
| | | ServerHandler handler; |
| | | if (isLDAPserver) |
| | | handler = connectedServers.get(serverId); |
| | | else |
| | | handler = changelogServers.get(serverId); |
| | | List<ServerHandler> servers = |
| | | new ArrayList<ServerHandler>(); |
| | | |
| | | // TODO : check for null handler and log error |
| | | try |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | "getDestinationServers" |
| | | + " msgDest:" + msg.getDestination() , 1); |
| | | |
| | | if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER) |
| | | { |
| | | handler.sendAck(changeNumber); |
| | | } catch (IOException e) |
| | | { |
| | | /* |
| | | * An error happened trying the send back an ack to this server. |
| | | * Log an error and close the connection to this server. |
| | | */ |
| | | int msgID = MSGID_CHANGELOG_ERROR_SENDING_ACK; |
| | | String message = getMessage(msgID, this.toString()) |
| | | + stackTraceToSingleLineString(e); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | handler.shutdown(); |
| | | // TODO Import from the "closest server" to be implemented |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this ChangelogCache. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | // Close session with other changelogs |
| | | for (ServerHandler serverHandler : changelogServers.values()) |
| | | else if (msg.getDestination() == RoutableMessage.ALL_SERVERS) |
| | | { |
| | | serverHandler.shutdown(); |
| | | } |
| | | |
| | | // Close session with other LDAP servers |
| | | for (ServerHandler serverHandler : connectedServers.values()) |
| | | { |
| | | serverHandler.shutdown(); |
| | | } |
| | | |
| | | // Shutdown the dbHandlers |
| | | synchronized (sourceDbHandlers) |
| | | { |
| | | for (DbHandler dbHandler : sourceDbHandlers.values()) |
| | | if (!senderHandler.isChangelogServer()) |
| | | { |
| | | dbHandler.shutdown(); |
| | | // Send to all changelogServers |
| | | for (ServerHandler destinationHandler : changelogServers.values()) |
| | | { |
| | | servers.add(destinationHandler); |
| | | } |
| | | } |
| | | sourceDbHandlers.clear(); |
| | | |
| | | // Send to all connected LDAP servers |
| | | for (ServerHandler destinationHandler : connectedServers.values()) |
| | | { |
| | | // Don't loop on the sender |
| | | if (destinationHandler == senderHandler) |
| | | continue; |
| | | servers.add(destinationHandler); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // Destination is one server |
| | | ServerHandler destinationHandler = |
| | | connectedServers.get(msg.getDestination()); |
| | | if (destinationHandler != null) |
| | | { |
| | | servers.add(destinationHandler); |
| | | } |
| | | else |
| | | { |
| | | // the targeted server is NOT connected |
| | | if (senderHandler.isLDAPserver()) |
| | | { |
| | | // let's forward to the other changelogs |
| | | servers.addAll(changelogServers.values()); |
| | | } |
| | | } |
| | | } |
| | | |
| | | return servers; |
| | | } |
| | | |
| | | /** |
| | | * Returns the ServerState describing the last change from this replica. |
| | | * Process an InitializeRequestMessage. |
| | | * |
| | | * @return The ServerState describing the last change from this replica. |
| | | * @param msg The message received and to be processed. |
| | | * @param senderHandler The server handler of the server that emitted |
| | | * the message. |
| | | */ |
| | | public ServerState getDbServerState() |
| | | public void process(RoutableMessage msg, ServerHandler senderHandler) |
| | | { |
| | | ServerState serverState = new ServerState(); |
| | | for (DbHandler db : sourceDbHandlers.values()) |
| | | { |
| | | serverState.update(db.getLastChange()); |
| | | } |
| | | return serverState; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "ChangelogCache " + baseDn; |
| | | } |
| | | List<ServerHandler> servers = getDestinationServers(msg, senderHandler); |
| | | |
| | | /** |
| | | * Check if some server Handler should be removed from flow control state. |
| | | * @throws IOException If an error happened. |
| | | */ |
| | | public void checkAllSaturation() throws IOException |
| | | { |
| | | for (ServerHandler handler : changelogServers.values()) |
| | | if (servers.isEmpty()) |
| | | { |
| | | handler.checkWindow(); |
| | | if (!(msg instanceof InitializeRequestMessage)) |
| | | { |
| | | // TODO A more elaborated policy is probably needed |
| | | } |
| | | else |
| | | { |
| | | ErrorMessage errMsg = new ErrorMessage( |
| | | msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN, |
| | | "serverID:" + msg.getDestination()); |
| | | |
| | | try |
| | | { |
| | | senderHandler.send(errMsg); |
| | | } |
| | | catch(IOException ioe) |
| | | { |
| | | // TODO Handle error properly (sender timeout in addition) |
| | | } |
| | | } |
| | | return; |
| | | } |
| | | |
| | | for (ServerHandler handler : connectedServers.values()) |
| | | for (ServerHandler targetHandler : servers) |
| | | { |
| | | handler.checkWindow(); |
| | | try |
| | | { |
| | | targetHandler.send(msg); |
| | | } |
| | | catch(IOException ioe) |
| | | { |
| | | // TODO Handle error properly (sender timeout in addition) |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Check if a server that was in flow control can now restart |
| | | * sending updates. |
| | | * @param sourceHandler The server that must be checked. |
| | | * @return true if the server can restart sending changes. |
| | | * false if the server can't restart sending changes. |
| | | */ |
| | | public boolean restartAfterSaturation(ServerHandler sourceHandler) |
| | | { |
| | | for (ServerHandler handler : changelogServers.values()) |
| | | /** |
| | | * Send back an ack to the server that sent the change. |
| | | * |
| | | * @param changeNumber The ChangeNumber of the change that must be acked. |
| | | * @param isLDAPserver This boolean indicates if the server that sent the |
| | | * change was an LDAP server or a Changelog server. |
| | | */ |
| | | public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver) |
| | | { |
| | | if (!handler.restartAfterSaturation(sourceHandler)) |
| | | short serverId = changeNumber.getServerId(); |
| | | sendAck(changeNumber, isLDAPserver, serverId); |
| | | } |
| | | |
| | | /** |
| | | * |
| | | * Send back an ack to a server that sent the change. |
| | | * |
| | | * @param changeNumber The ChangeNumber of the change that must be acked. |
| | | * @param isLDAPserver This boolean indicates if the server that sent the |
| | | * change was an LDAP server or a Changelog server. |
| | | * @param serverId The identifier of the server from which we |
| | | * received the change.. |
| | | */ |
| | | public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver, |
| | | short serverId) |
| | | { |
| | | ServerHandler handler; |
| | | if (isLDAPserver) |
| | | handler = connectedServers.get(serverId); |
| | | else |
| | | handler = changelogServers.get(serverId); |
| | | |
| | | // TODO : check for null handler and log error |
| | | try |
| | | { |
| | | handler.sendAck(changeNumber); |
| | | } catch (IOException e) |
| | | { |
| | | /* |
| | | * An error happened trying the send back an ack to this server. |
| | | * Log an error and close the connection to this server. |
| | | */ |
| | | int msgID = MSGID_CHANGELOG_ERROR_SENDING_ACK; |
| | | String message = getMessage(msgID, this.toString()) |
| | | + stackTraceToSingleLineString(e); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | handler.shutdown(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this ChangelogCache. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | // Close session with other changelogs |
| | | for (ServerHandler serverHandler : changelogServers.values()) |
| | | { |
| | | serverHandler.shutdown(); |
| | | } |
| | | |
| | | // Close session with other LDAP servers |
| | | for (ServerHandler serverHandler : connectedServers.values()) |
| | | { |
| | | serverHandler.shutdown(); |
| | | } |
| | | |
| | | // Shutdown the dbHandlers |
| | | synchronized (sourceDbHandlers) |
| | | { |
| | | for (DbHandler dbHandler : sourceDbHandlers.values()) |
| | | { |
| | | dbHandler.shutdown(); |
| | | } |
| | | sourceDbHandlers.clear(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the ServerState describing the last change from this replica. |
| | | * |
| | | * @return The ServerState describing the last change from this replica. |
| | | */ |
| | | public ServerState getDbServerState() |
| | | { |
| | | ServerState serverState = new ServerState(); |
| | | for (DbHandler db : sourceDbHandlers.values()) |
| | | { |
| | | serverState.update(db.getLastChange()); |
| | | } |
| | | return serverState; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "ChangelogCache " + baseDn; |
| | | } |
| | | |
| | | /** |
| | | * Check if some server Handler should be removed from flow control state. |
| | | * @throws IOException If an error happened. |
| | | */ |
| | | public void checkAllSaturation() throws IOException |
| | | { |
| | | for (ServerHandler handler : changelogServers.values()) |
| | | { |
| | | handler.checkWindow(); |
| | | } |
| | | |
| | | for (ServerHandler handler : connectedServers.values()) |
| | | { |
| | | handler.checkWindow(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Check if a server that was in flow control can now restart |
| | | * sending updates. |
| | | * @param sourceHandler The server that must be checked. |
| | | * @return true if the server can restart sending changes. |
| | | * false if the server can't restart sending changes. |
| | | */ |
| | | public boolean restartAfterSaturation(ServerHandler sourceHandler) |
| | | { |
| | | for (ServerHandler handler : changelogServers.values()) |
| | | { |
| | | if (!handler.restartAfterSaturation(sourceHandler)) |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | for (ServerHandler handler : connectedServers.values()) |
| | | { |
| | | if (!handler.restartAfterSaturation(sourceHandler)) |
| | | for (ServerHandler handler : connectedServers.values()) |
| | | { |
| | | if (!handler.restartAfterSaturation(sourceHandler)) |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | return true; |
| | | } |
| | | } |