| | |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.AckMessage; |
| | | import org.opends.server.replication.protocol.ErrorMessage; |
| | | import org.opends.server.replication.protocol.InitializeRequestMessage; |
| | | import org.opends.server.replication.protocol.RoutableMessage; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.replication.protocol.ReplServerInfoMessage; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | |
| | | * We add new TreeSet in the HashMap when a new replication server register |
| | | * to this replication server. |
| | | */ |
| | | |
| | | private Map<Short, ServerHandler> replicationServers = |
| | | new ConcurrentHashMap<Short, ServerHandler>(); |
| | | |
| | |
| | | return false; |
| | | } |
| | | connectedServers.put(handler.getServerId(), handler); |
| | | |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | sendReplServerInfo(); |
| | | |
| | | return true; |
| | | } |
| | | } |
| | |
| | | if (handler.isReplicationServer()) |
| | | replicationServers.remove(handler.getServerId()); |
| | | else |
| | | { |
| | | connectedServers.remove(handler.getServerId()); |
| | | |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | sendReplServerInfo(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | return false; |
| | | } |
| | | replicationServers.put(handler.getServerId(), handler); |
| | | |
| | | // Update this server with the list of LDAP servers |
| | | // already connected |
| | | handler.sendInfo( |
| | | new ReplServerInfoMessage(getConnectedLDAPservers())); |
| | | |
| | | return true; |
| | | } |
| | | } |
| | |
| | | return sourceDbHandlers.keySet(); |
| | | } |
| | | |
| | | /** |
| | | * Returns as a set of String the list of LDAP servers connected to us. |
| | | * Each string is the serverID of a connected LDAP server. |
| | | * |
| | | * @return The set of connected LDAP servers |
| | | */ |
| | | public List<String> getConnectedLDAPservers() |
| | | { |
| | | List<String> mySet = new ArrayList<String>(0); |
| | | |
| | | for (ServerHandler handler : connectedServers.values()) |
| | | { |
| | | mySet.add(String.valueOf(handler.getServerId())); |
| | | } |
| | | return mySet; |
| | | } |
| | | |
| | | /** |
| | | * Creates and returns an iterator. |
| | |
| | | protected List<ServerHandler> getDestinationServers(RoutableMessage msg, |
| | | ServerHandler senderHandler) |
| | | { |
| | | |
| | | List<ServerHandler> servers = |
| | | new ArrayList<ServerHandler>(); |
| | | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | "getDestinationServers" |
| | | + " msgDest:" + msg.getDestination() , 1); |
| | | |
| | | if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER) |
| | | { |
| | | // TODO Import from the "closest server" to be implemented |
| | |
| | | } |
| | | } |
| | | |
| | | // Send to all connected LDAP servers |
| | | // Sends to all connected LDAP servers |
| | | for (ServerHandler destinationHandler : connectedServers.values()) |
| | | { |
| | | // Don't loop on the sender |
| | |
| | | else |
| | | { |
| | | // the targeted server is NOT connected |
| | | // Let's search for THE changelog server that MAY |
| | | // have the targeted server connected. |
| | | if (senderHandler.isLDAPserver()) |
| | | { |
| | | // let's forward to the other changelogs |
| | | servers.addAll(replicationServers.values()); |
| | | for (ServerHandler h : replicationServers.values()) |
| | | { |
| | | if (h.isRemoteLDAPServer(msg.getDestination())) |
| | | { |
| | | servers.add(h); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | return servers; |
| | | } |
| | | |
| | |
| | | |
| | | if (servers.isEmpty()) |
| | | { |
| | | if (!(msg instanceof InitializeRequestMessage)) |
| | | { |
| | | // TODO A more elaborated policy is probably needed |
| | | } |
| | | else |
| | | { |
| | | ErrorMessage errMsg = new ErrorMessage( |
| | | msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN, |
| | | "serverID:" + msg.getDestination()); |
| | | |
| | | try |
| | | { |
| | | senderHandler.send(errMsg); |
| | | } |
| | | catch(IOException ioe) |
| | | { |
| | | // TODO Handle error properly (sender timeout in addition) |
| | | } |
| | | } |
| | | return; |
| | | } |
| | | |
| | | for (ServerHandler targetHandler : servers) |
| | | { |
| | | ErrorMessage errMsg = new ErrorMessage( |
| | | msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN, |
| | | "serverID:" + msg.getDestination()); |
| | | try |
| | | { |
| | | targetHandler.send(msg); |
| | | senderHandler.send(errMsg); |
| | | } |
| | | catch(IOException ioe) |
| | | { |
| | | // TODO Handle error properly (sender timeout in addition) |
| | | /* |
| | | * An error happened trying the send back an ack to this server. |
| | | * Log an error and close the connection to this server. |
| | | */ |
| | | int msgID = MSGID_CHANGELOG_ERROR_SENDING_ERROR; |
| | | String message = getMessage(msgID, this.toString()) |
| | | + stackTraceToSingleLineString(ioe); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | senderHandler.shutdown(); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | for (ServerHandler targetHandler : servers) |
| | | { |
| | | try |
| | | { |
| | | targetHandler.send(msg); |
| | | } |
| | | catch(IOException ioe) |
| | | { |
| | | /* |
| | | * An error happened trying the send back an ack to this server. |
| | | * Log an error and close the connection to this server. |
| | | */ |
| | | int msgID = MSGID_CHANGELOG_ERROR_SENDING_MSG; |
| | | String message = getMessage(msgID, this.toString()) |
| | | + stackTraceToSingleLineString(ioe) + " " |
| | | + msg.getClass().getCanonicalName(); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | senderHandler.shutdown(); |
| | | // TODO Handle error properly (sender timeout in addition) |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Send a ReplServerInfoMessage to all the connected replication servers |
| | | * in order to let them know our connected LDAP servers. |
| | | */ |
| | | private void sendReplServerInfo() |
| | | { |
| | | ReplServerInfoMessage info = |
| | | new ReplServerInfoMessage(getConnectedLDAPservers()); |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | | try |
| | | { |
| | | handler.sendInfo(info); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | /* |
| | | * An error happened trying the send back an ack to this server. |
| | | * Log an error and close the connection to this server. |
| | | */ |
| | | int msgID = MSGID_CHANGELOG_ERROR_SENDING_INFO; |
| | | String message = getMessage(msgID, this.toString()) |
| | | + stackTraceToSingleLineString(e); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | handler.shutdown(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Sets the replication server informations for the provided |
| | | * handler from the provided ReplServerInfoMessage. |
| | | * |
| | | * @param handler The server handler from which the info was received. |
| | | * @param infoMsg The information message that was received. |
| | | */ |
| | | public void setReplServerInfo( |
| | | ServerHandler handler, ReplServerInfoMessage infoMsg) |
| | | { |
| | | handler.setReplServerInfo(infoMsg); |
| | | } |
| | | } |