| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | * Portions Copyright 2006-2008 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import org.opends.messages.*; |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | |
| | | import java.net.SocketException; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.Collection; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.Semaphore; |
| | |
| | | import org.opends.server.types.SearchResultReference; |
| | | import org.opends.server.types.SearchScope; |
| | | |
| | | |
| | | /** |
| | | * The broker for Multi-master Replication. |
| | | */ |
| | | public class ReplicationBroker implements InternalSearchListener |
| | | { |
| | | |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | private boolean shutdown = false; |
| | | private Collection<String> servers; |
| | | private boolean connected = false; |
| | |
| | | private long generationId = -1; |
| | | private ReplSessionSecurity replSessionSecurity; |
| | | |
| | | // Trick for avoiding a inner class for many parameters return for |
| | | // performHandshake method. |
| | | private String tmpReadableServerName = null; |
| | | /** |
| | | * The time in milliseconds between heartbeats from the replication |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | | private long heartbeatInterval = 0; |
| | | |
| | | |
| | | /** |
| | | * A thread to monitor heartbeats on the session. |
| | | */ |
| | | private HeartbeatMonitor heartbeatMonitor = null; |
| | | |
| | | /** |
| | | * The number of times the connection was lost. |
| | | */ |
| | | private int numLostConnections = 0; |
| | | |
| | | /** |
| | | * When the broker cannot connect to any replication server |
| | | * it log an error and keeps continuing every second. |
| | |
| | | * finally succeed to connect. |
| | | */ |
| | | private boolean connectionError = false; |
| | | |
| | | private final Object connectPhaseLock = new Object(); |
| | | |
| | | /** |
| | |
| | | * @param replSessionSecurity The session security configuration. |
| | | */ |
| | | public ReplicationBroker(ServerState state, DN baseDn, short serverID, |
| | | int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, |
| | | int maxSendDelay, int window, long heartbeatInterval, |
| | | long generationId, ReplSessionSecurity replSessionSecurity) |
| | | int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, |
| | | int maxSendDelay, int window, long heartbeatInterval, |
| | | long generationId, ReplSessionSecurity replSessionSecurity) |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.serverID = serverID; |
| | |
| | | new TreeSet<FakeOperation>(new FakeOperationComparator()); |
| | | this.rcvWindow = window; |
| | | this.maxRcvWindow = window; |
| | | this.halfRcvWindow = window/2; |
| | | this.halfRcvWindow = window / 2; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.protocolVersion = ProtocolVersion.currentVersion(); |
| | | this.generationId = generationId; |
| | |
| | | this.connect(); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Connect to a ReplicationServer. |
| | | * |
| | |
| | | */ |
| | | private void connect() |
| | | { |
| | | ReplServerStartMessage replServerStartMsg = null; |
| | | HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); |
| | | |
| | | // Stop any existing heartbeat monitor from a previous session. |
| | | if (heartbeatMonitor != null) |
| | |
| | | heartbeatMonitor = null; |
| | | } |
| | | |
| | | // checkState is true for the first loop on all replication servers |
| | | // looking for one already up-to-date. |
| | | // If we found some responding replication servers but none up-to-date |
| | | // then we set check-state to false and do a second loop where the first |
| | | // found will be the one elected and then we will update this replication |
| | | // server. |
| | | boolean checkState = true; |
| | | boolean receivedResponse = true; |
| | | |
| | | // TODO: We are doing here 2 loops opening , closing , reopening session to |
| | | // the same servers .. risk to have 'same server id' erros. |
| | | // Would be better to do only one loop, keeping the best candidate while |
| | | // traversing the list of replication servers to connect to. |
| | | if (servers.size()==1) |
| | | { |
| | | checkState = false; |
| | | } |
| | | |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | while ((!connected) && (!shutdown) && (receivedResponse)) |
| | | /* |
| | | * Connect to each replication server and get their ServerState then find |
| | | * out which one is the best to connect to. |
| | | */ |
| | | for (String server : servers) |
| | | { |
| | | receivedResponse = false; |
| | | for (String server : servers) |
| | | { |
| | | int separator = server.lastIndexOf(':'); |
| | | String port = server.substring(separator + 1); |
| | | String hostname = server.substring(0, separator); |
| | | // Connect to server and get reply message |
| | | ReplServerStartMessage replServerStartMsg = |
| | | performHandshake(server, false); |
| | | tmpReadableServerName = null; // Not needed now |
| | | |
| | | // Store reply message in list |
| | | if (replServerStartMsg != null) |
| | | { |
| | | ServerState rsState = replServerStartMsg.getServerState(); |
| | | rsStates.put(server, rsState); |
| | | } |
| | | } // for servers |
| | | |
| | | ReplServerStartMessage replServerStartMsg = null; |
| | | |
| | | if (rsStates.size() > 0) |
| | | { |
| | | |
| | | // At least one server answered, find the best one. |
| | | String bestServer = computeBestReplicationServer(state, rsStates, |
| | | serverID, baseDn); |
| | | |
| | | // Best found, now connect to this one |
| | | replServerStartMsg = performHandshake(bestServer, true); |
| | | |
| | | if (replServerStartMsg != null) |
| | | { |
| | | try |
| | | { |
| | | /* |
| | | * Open a socket connection to the next candidate. |
| | | */ |
| | | InetSocketAddress ServerAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), Integer.parseInt(port)); |
| | | Socket socket = new Socket(); |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.setTcpNoDelay(true); |
| | | socket.connect(ServerAddr, 500); |
| | | session = replSessionSecurity.createClientSession(server, socket); |
| | | boolean isSslEncryption = |
| | | replSessionSecurity.isSslEncryption(server); |
| | | /* |
| | | * Send our ServerStartMessage. |
| | | */ |
| | | ServerStartMessage msg = new ServerStartMessage(serverID, baseDn, |
| | | maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, |
| | | halfRcvWindow*2, heartbeatInterval, state, |
| | | protocolVersion, generationId, isSslEncryption); |
| | | session.publish(msg); |
| | | |
| | | |
| | | /* |
| | | * Read the ReplServerStartMessage that should come back. |
| | | */ |
| | | session.setSoTimeout(1000); |
| | | replServerStartMsg = (ReplServerStartMessage) session.receive(); |
| | | receivedResponse = true; |
| | | |
| | | /* |
| | | * We have sent our own protocol version to the replication server. |
| | | * The replication server will use the same one (or an older one |
| | | * if it is an old replication server). |
| | | */ |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | replServerStartMsg.getVersion()); |
| | | session.setSoTimeout(timeout); |
| | | |
| | | if (!isSslEncryption) |
| | | { |
| | | session.stopEncryption(); |
| | | } |
| | | |
| | | /* |
| | | * We must not publish changes to a replicationServer that has not |
| | | * seen all our previous changes because this could cause some |
| | | * other ldap servers to miss those changes. |
| | | * Check that the ReplicationServer has seen all our previous |
| | | * changes. |
| | | * If not, try another replicationServer. |
| | | * If no other replicationServer has seen all our changes, recover |
| | | * those changes and send them again to any replicationServer. |
| | | */ |
| | | ChangeNumber replServerMaxChangeNumber = |
| | | replServerStartMsg.getServerState().getMaxChangeNumber(serverID); |
| | | |
| | | if (replServerMaxChangeNumber == null) |
| | | { |
| | | replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID); |
| | | } |
| | | ChangeNumber ourMaxChangeNumber = |
| | | state.getMaxChangeNumber(serverID); |
| | | if ((ourMaxChangeNumber == null) || |
| | | (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) |
| | | |
| | | if ((ourMaxChangeNumber != null) && |
| | | (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) |
| | | { |
| | | replicationServer = ServerAddr.toString(); |
| | | maxSendWindow = replServerStartMsg.getWindowSize(); |
| | | connected = true; |
| | | startHeartBeat(); |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | if (checkState == true) |
| | | |
| | | // Replication server is missing some of our changes: let's send |
| | | // them to him. |
| | | replayOperations.clear(); |
| | | |
| | | Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get(); |
| | | logError(message); |
| | | |
| | | /* |
| | | * Get all the changes that have not been seen by this |
| | | * replication server and populate the replayOperations |
| | | * list. |
| | | */ |
| | | InternalSearchOperation op = seachForChangedEntries( |
| | | baseDn, replServerMaxChangeNumber, this); |
| | | if (op.getResultCode() != ResultCode.SUCCESS) |
| | | { |
| | | /* This replicationServer is missing some |
| | | * of our changes, we are going to try another server |
| | | * but before log a notice message |
| | | /* |
| | | * An error happened trying to search for the updates |
| | | * This server will start acepting again new updates but |
| | | * some inconsistencies will stay between servers. |
| | | * Log an error for the repair tool |
| | | * that will need to resynchronize the servers. |
| | | */ |
| | | Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server, |
| | | baseDn.toNormalizedString()); |
| | | message = ERR_CANNOT_RECOVER_CHANGES.get( |
| | | baseDn.toNormalizedString()); |
| | | logError(message); |
| | | } else |
| | | { |
| | | for (FakeOperation replayOp : replayOperations) |
| | | { |
| | | message = DEBUG_SENDING_CHANGE.get(replayOp.getChangeNumber(). |
| | | toString()); |
| | | logError(message); |
| | | session.publish(replayOp.generateMessage()); |
| | | } |
| | | message = DEBUG_CHANGES_SENT.get(); |
| | | logError(message); |
| | | } |
| | | else |
| | | { |
| | | replayOperations.clear(); |
| | | |
| | | // TODO: i18n |
| | | logError(Message.raw("going to search for changes")); |
| | | |
| | | /* |
| | | * Get all the changes that have not been seen by this |
| | | * replicationServer and populate the replayOperations |
| | | * list. |
| | | */ |
| | | InternalSearchOperation op = seachForChangedEntries( |
| | | baseDn, replServerMaxChangeNumber, this); |
| | | if (op.getResultCode() != ResultCode.SUCCESS) |
| | | { |
| | | /* |
| | | * An error happened trying to search for the updates |
| | | * This server will start acepting again new updates but |
| | | * some inconsistencies will stay between servers. |
| | | * Log an error for the repair tool |
| | | * that will need to resynchronize the servers. |
| | | */ |
| | | Message message = ERR_CANNOT_RECOVER_CHANGES.get( |
| | | baseDn.toNormalizedString()); |
| | | logError(message); |
| | | replicationServer = ServerAddr.toString(); |
| | | maxSendWindow = replServerStartMsg.getWindowSize(); |
| | | connected = true; |
| | | startHeartBeat(); |
| | | } |
| | | else |
| | | { |
| | | replicationServer = ServerAddr.toString(); |
| | | maxSendWindow = replServerStartMsg.getWindowSize(); |
| | | connected = true; |
| | | for (FakeOperation replayOp : replayOperations) |
| | | { |
| | | logError(Message.raw("sendingChange")); // TODO: i18n |
| | | session.publish(replayOp.generateMessage()); |
| | | } |
| | | startHeartBeat(); |
| | | logError(Message.raw("changes sent")); // TODO: i18n |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | catch (ConnectException e) |
| | | |
| | | replicationServer = tmpReadableServerName; |
| | | maxSendWindow = replServerStartMsg.getWindowSize(); |
| | | connected = true; |
| | | startHeartBeat(); |
| | | } catch (IOException e) |
| | | { |
| | | /* |
| | | * There was no server waiting on this host:port |
| | | * Log a notice and try the next replicationServer in the list |
| | | */ |
| | | if (!connectionError ) |
| | | { |
| | | // the error message is only logged once to avoid overflowing |
| | | // the error log |
| | | Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server); |
| | | logError(message); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | Message message = ERR_EXCEPTION_STARTING_SESSION.get( |
| | | baseDn.toNormalizedString(), server, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | Message message = ERR_PUBLISHING_FAKE_OPS.get( |
| | | baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } |
| | | finally |
| | | } catch (Exception e) |
| | | { |
| | | Message message = ERR_COMPUTING_FAKE_OPS.get( |
| | | baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } finally |
| | | { |
| | | if (connected == false) |
| | | { |
| | |
| | | session.close(); |
| | | } catch (IOException e) |
| | | { |
| | | // The session was already closed, just ignore. |
| | | // The session was already closed, just ignore. |
| | | } |
| | | session = null; |
| | | } |
| | | } |
| | | } |
| | | } // for servers |
| | | |
| | | // We have traversed all the replication servers |
| | | |
| | | if ((!connected) && (checkState == true) && receivedResponse) |
| | | { |
| | | /* |
| | | * We could not find a replicationServer that has seen all the |
| | | * changes that this server has already processed, start again |
| | | * the loop looking for any replicationServer. |
| | | */ |
| | | Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get( |
| | | baseDn.toNormalizedString()); |
| | | logError(message); |
| | | checkState = false; |
| | | } |
| | | } |
| | | |
| | | // We have traversed all the replication servers as many times as needed |
| | | // to find one if one is up and running. |
| | | } // Could perform handshake with best |
| | | } // Reached some servers |
| | | |
| | | if (connected) |
| | | { |
| | | // This server has connected correctly. |
| | | // Log a message to let the administrator know that the failure was |
| | | // resolved. |
| | | // wakeup all the thread that were waiting on the window |
| | | // Wakeup all the thread that were waiting on the window |
| | | // on the previous connection. |
| | | connectionError = false; |
| | | if (sendWindow != null) |
| | | { |
| | | sendWindow.release(Integer.MAX_VALUE); |
| | | } |
| | | this.sendWindow = new Semaphore(maxSendWindow); |
| | | connectPhaseLock.notify(); |
| | | |
| | | if ((replServerStartMsg.getGenerationId() == this.generationId) || |
| | | (replServerStartMsg.getGenerationId() == -1)) |
| | | (replServerStartMsg.getGenerationId() == -1)) |
| | | { |
| | | Message message = |
| | | NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get( |
| | | baseDn.toString(), |
| | | replicationServer, |
| | | Long.toString(this.generationId)); |
| | | baseDn.toString(), |
| | | replicationServer, |
| | | Long.toString(this.generationId)); |
| | | logError(message); |
| | | } |
| | | else |
| | | } else |
| | | { |
| | | Message message = |
| | | NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get( |
| | | baseDn.toString(), |
| | | replicationServer, |
| | | Long.toString(this.generationId), |
| | | Long.toString(replServerStartMsg.getGenerationId())); |
| | | baseDn.toString(), |
| | | replicationServer, |
| | | Long.toString(this.generationId), |
| | | Long.toString(replServerStartMsg.getGenerationId())); |
| | | logError(message); |
| | | } |
| | | } |
| | | else |
| | | } else |
| | | { |
| | | /* |
| | | * This server could not find any replicationServer |
| | | * It's going to start in degraded mode. |
| | | * Log a message |
| | | * This server could not find any replicationServer. It's going to start |
| | | * in degraded mode. Log a message. |
| | | */ |
| | | if (!connectionError) |
| | | { |
| | | checkState = false; |
| | | connectionError = true; |
| | | connectPhaseLock.notify(); |
| | | Message message = |
| | | NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString()); |
| | | NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString()); |
| | | logError(message); |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Connect to the provided server performing the handshake (start messages |
| | | * exchange) and return the reply message from the replication server. |
| | | * |
| | | * @param server Server to connect to. |
| | | * @param keepConnection Do we keep session opened or not after handshake. |
| | | * @return The ReplServerStartMessage the server replied. Null if could not |
| | | * get an answer. |
| | | */ |
| | | public ReplServerStartMessage performHandshake(String server, |
| | | boolean keepConnection) |
| | | { |
| | | ReplServerStartMessage replServerStartMsg = null; |
| | | |
| | | // Parse server string. |
| | | int separator = server.lastIndexOf(':'); |
| | | String port = server.substring(separator + 1); |
| | | String hostname = server.substring(0, separator); |
| | | |
| | | boolean error = false; |
| | | try |
| | | { |
| | | /* |
| | | * Open a socket connection to the next candidate. |
| | | */ |
| | | int intPort = Integer.parseInt(port); |
| | | InetSocketAddress serverAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), intPort); |
| | | tmpReadableServerName = serverAddr.toString(); |
| | | Socket socket = new Socket(); |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.setTcpNoDelay(true); |
| | | socket.connect(serverAddr, 500); |
| | | session = replSessionSecurity.createClientSession(server, socket); |
| | | boolean isSslEncryption = |
| | | replSessionSecurity.isSslEncryption(server); |
| | | /* |
| | | * Send our ServerStartMessage. |
| | | */ |
| | | ServerStartMessage msg = new ServerStartMessage(serverID, baseDn, |
| | | maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, |
| | | halfRcvWindow * 2, heartbeatInterval, state, |
| | | protocolVersion, generationId, isSslEncryption); |
| | | session.publish(msg); |
| | | |
| | | /* |
| | | * Read the ReplServerStartMessage that should come back. |
| | | */ |
| | | session.setSoTimeout(1000); |
| | | replServerStartMsg = (ReplServerStartMessage) session.receive(); |
| | | |
| | | /* |
| | | * We have sent our own protocol version to the replication server. |
| | | * The replication server will use the same one (or an older one |
| | | * if it is an old replication server). |
| | | */ |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | replServerStartMsg.getVersion()); |
| | | session.setSoTimeout(timeout); |
| | | |
| | | if (!isSslEncryption) |
| | | { |
| | | session.stopEncryption(); |
| | | } |
| | | } catch (ConnectException e) |
| | | { |
| | | /* |
| | | * There was no server waiting on this host:port |
| | | * Log a notice and try the next replicationServer in the list |
| | | */ |
| | | if (!connectionError) |
| | | { |
| | | // the error message is only logged once to avoid overflowing |
| | | // the error log |
| | | Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server); |
| | | logError(message); |
| | | } |
| | | error = true; |
| | | } catch (Exception e) |
| | | { |
| | | Message message = ERR_EXCEPTION_STARTING_SESSION.get( |
| | | baseDn.toNormalizedString(), server, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | error = true; |
| | | } |
| | | |
| | | // Close session if requested |
| | | if (!keepConnection || error) |
| | | { |
| | | if (session != null) |
| | | { |
| | | try |
| | | { |
| | | session.close(); |
| | | } catch (IOException e) |
| | | { |
| | | // The session was already closed, just ignore. |
| | | } |
| | | session = null; |
| | | } |
| | | if (error) |
| | | { |
| | | replServerStartMsg = null; |
| | | } // Be sure to return null. |
| | | } |
| | | |
| | | return replServerStartMsg; |
| | | } |
| | | |
| | | /** |
| | | * Returns the replication server that best fits our need so that we can |
| | | * connect to it. |
| | | * |
| | | * Note: this method put as public static for unit testing purpose. |
| | | * |
| | | * @param myState The local server state. |
| | | * @param rsStates The list of available replication servers and their |
| | | * associated server state. |
| | | * @param serverId The server id for the suffix we are working for. |
| | | * @param baseDn The suffix for which we are working for. |
| | | * @return The computed best replication server. |
| | | */ |
| | | public static String computeBestReplicationServer(ServerState myState, |
| | | HashMap<String, ServerState> rsStates, short serverId, DN baseDn) |
| | | { |
| | | |
| | | /* |
| | | * Find replication servers who are up to date (or more up to date than us, |
| | | * if for instance we failed and restarted, having sent some changes to the |
| | | * RS but without having time to store our own state) regarding our own |
| | | * server id. Then, among them, choose the server that is the most up to |
| | | * date regarding the whole topology. |
| | | * |
| | | * If no server is up to date regarding our own server id, find the one who |
| | | * is the most up to date regarding our server id. |
| | | */ |
| | | |
| | | // Should never happen (sanity check) |
| | | if ((myState == null) || (rsStates == null) || (rsStates.size() < 1) || |
| | | (baseDn == null)) |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | String bestServer = null; |
| | | // Servers up to dates with regard to our changes |
| | | HashMap<String, ServerState> upToDateServers = |
| | | new HashMap<String, ServerState>(); |
| | | // Servers late with regard to our changes |
| | | HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>(); |
| | | |
| | | /* |
| | | * Start loop to differenciate up to date servers from late ones. |
| | | */ |
| | | ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId); |
| | | if (myChangeNumber == null) |
| | | { |
| | | myChangeNumber = new ChangeNumber(0, 0, serverId); |
| | | } |
| | | for (String repServer : rsStates.keySet()) |
| | | { |
| | | |
| | | ServerState rsState = rsStates.get(repServer); |
| | | ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId); |
| | | if (rsChangeNumber == null) |
| | | { |
| | | rsChangeNumber = new ChangeNumber(0, 0, serverId); |
| | | } |
| | | |
| | | // Store state in right list |
| | | if (myChangeNumber.olderOrEqual(rsChangeNumber)) |
| | | { |
| | | upToDateServers.put(repServer, rsState); |
| | | } else |
| | | { |
| | | lateOnes.put(repServer, rsState); |
| | | } |
| | | } |
| | | |
| | | if (upToDateServers.size() > 0) |
| | | { |
| | | |
| | | /* |
| | | * Some up to date servers, among them, choose the one that has the |
| | | * maximum number of changes to send us. This is the most up to date one |
| | | * regarding the whole topology. This server is the one which has the less |
| | | * difference with the topology server state. For comparison, we need to |
| | | * compute the difference for each server id with the topology server |
| | | * state. |
| | | */ |
| | | |
| | | Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get( |
| | | upToDateServers.size(), |
| | | baseDn.toNormalizedString()); |
| | | logError(message); |
| | | |
| | | /* |
| | | * First of all, compute the virtual server state for the whole topology, |
| | | * which is composed of the most up to date change numbers for |
| | | * each server id in the topology. |
| | | */ |
| | | ServerState topoState = new ServerState(); |
| | | for (ServerState curState : upToDateServers.values()) |
| | | { |
| | | |
| | | Iterator<Short> it = curState.iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | Short sId = it.next(); |
| | | ChangeNumber curSidCn = curState.getMaxChangeNumber(sId); |
| | | if (curSidCn == null) |
| | | { |
| | | curSidCn = new ChangeNumber(0, 0, sId); |
| | | } |
| | | // Update topology state |
| | | topoState.update(curSidCn); |
| | | } |
| | | } // For up to date servers |
| | | |
| | | // Min of the max shifts |
| | | long minShift = -1L; |
| | | for (String upServer : upToDateServers.keySet()) |
| | | { |
| | | |
| | | /* |
| | | * Compute the maximum difference between the time of a server id's |
| | | * change number and the time of the matching server id's change |
| | | * number in the topology server state. |
| | | * |
| | | * Note: we could have used the sequence number here instead of the |
| | | * timestamp, but this would have caused a problem when the sequence |
| | | * number loops and comes back to 0 (computation would have becomen |
| | | * meaningless). |
| | | */ |
| | | long shift = -1L; |
| | | ServerState curState = upToDateServers.get(upServer); |
| | | Iterator<Short> it = curState.iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | Short sId = it.next(); |
| | | ChangeNumber curSidCn = curState.getMaxChangeNumber(sId); |
| | | if (curSidCn == null) |
| | | { |
| | | curSidCn = new ChangeNumber(0, 0, sId); |
| | | } |
| | | // Cannot be null as checked at construction time |
| | | ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId); |
| | | // Cannot be negative as topoState computed as being the max CN |
| | | // for each server id in the topology |
| | | long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime(); |
| | | if (tmpShift > shift) |
| | | { |
| | | shift = tmpShift; |
| | | } |
| | | } |
| | | |
| | | if ((minShift < 0) // First time in loop |
| | | || (shift < minShift)) |
| | | { |
| | | // This sever is even closer to topo state |
| | | bestServer = upServer; |
| | | minShift = shift; |
| | | } |
| | | } // For up to date servers |
| | | |
| | | } else |
| | | { |
| | | /* |
| | | * We could not find a replication server that has seen all the |
| | | * changes that this server has already processed, |
| | | */ |
| | | // lateOnes cannot be empty |
| | | Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get( |
| | | baseDn.toNormalizedString(), lateOnes.size()); |
| | | logError(message); |
| | | |
| | | // Min of the shifts |
| | | long minShift = -1L; |
| | | for (String lateServer : lateOnes.keySet()) |
| | | { |
| | | |
| | | /* |
| | | * Choose the server who is the closest to us regarding our server id |
| | | * (this is the most up to date regarding our server id). |
| | | */ |
| | | ServerState curState = lateOnes.get(lateServer); |
| | | ChangeNumber ourSidCn = curState.getMaxChangeNumber(serverId); |
| | | if (ourSidCn == null) |
| | | { |
| | | ourSidCn = new ChangeNumber(0, 0, serverId); |
| | | } |
| | | // Cannot be negative as our Cn for our server id is strictly |
| | | // greater than those of the servers in late server list |
| | | long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime(); |
| | | |
| | | if ((minShift < 0) // First time in loop |
| | | || (tmpShift < minShift)) |
| | | { |
| | | // This sever is even closer to topo state |
| | | bestServer = lateServer; |
| | | minShift = tmpShift; |
| | | } |
| | | } // For late servers |
| | | } |
| | | |
| | | return bestServer; |
| | | } |
| | | |
| | | /** |
| | | * Search for the changes that happened since fromChangeNumber |
| | | * based on the historical attribute. |
| | | * @param baseDn the base DN |
| | |
| | | * @throws Exception when raised. |
| | | */ |
| | | public static InternalSearchOperation seachForChangedEntries( |
| | | DN baseDn, |
| | | ChangeNumber fromChangeNumber, |
| | | InternalSearchListener resultListener) |
| | | throws Exception |
| | | DN baseDn, |
| | | ChangeNumber fromChangeNumber, |
| | | InternalSearchListener resultListener) |
| | | throws Exception |
| | | { |
| | | InternalClientConnection conn = |
| | | InternalClientConnection.getRootConnection(); |
| | | LDAPFilter filter = LDAPFilter.decode( |
| | | "("+ Historical.HISTORICALATTRIBUTENAME + |
| | | ">=dummy:" + fromChangeNumber + ")"); |
| | | "(" + Historical.HISTORICALATTRIBUTENAME + |
| | | ">=dummy:" + fromChangeNumber + ")"); |
| | | LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); |
| | | attrs.add(Historical.HISTORICALATTRIBUTENAME); |
| | | attrs.add(Historical.ENTRYUIDNAME); |
| | | return conn.processSearch( |
| | | new ASN1OctetString(baseDn.toString()), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | DereferencePolicy.NEVER_DEREF_ALIASES, |
| | | 0, 0, false, filter, |
| | | attrs, |
| | | resultListener); |
| | | new ASN1OctetString(baseDn.toString()), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | DereferencePolicy.NEVER_DEREF_ALIASES, |
| | | 0, 0, false, filter, |
| | | attrs, |
| | | resultListener); |
| | | } |
| | | |
| | | /** |
| | |
| | | if (heartbeatInterval > 0) |
| | | { |
| | | heartbeatMonitor = |
| | | new HeartbeatMonitor("Replication Heartbeat Monitor on " + |
| | | baseDn + " with " + getReplicationServer(), |
| | | session, heartbeatInterval); |
| | | new HeartbeatMonitor("Replication Heartbeat Monitor on " + |
| | | baseDn + " with " + getReplicationServer(), |
| | | session, heartbeatInterval); |
| | | heartbeatMonitor.start(); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * restart the ReplicationBroker. |
| | | */ |
| | |
| | | } |
| | | } catch (IOException e1) |
| | | { |
| | | // ignore |
| | | // ignore |
| | | } |
| | | |
| | | if (failingSession == session) |
| | |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get( |
| | | baseDn.toNormalizedString(), e.getLocalizedMessage())); |
| | | baseDn.toNormalizedString(), e.getLocalizedMessage())); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | } |
| | |
| | | Thread.sleep(500); |
| | | } catch (InterruptedException e) |
| | | { |
| | | // ignore |
| | | // ignore |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Publish a message to the other servers. |
| | | * @param msg the message to publish |
| | |
| | | { |
| | | boolean done = false; |
| | | |
| | | while (!done) |
| | | while (!done && !shutdown) |
| | | { |
| | | if (connectionError) |
| | | { |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("ReplicationBroker.publish() Publishing a " + |
| | | " message is not possible due to existing connection error."); |
| | | " message is not possible due to existing connection error."); |
| | | } |
| | | |
| | | return; |
| | |
| | | // want to hold off reconnection in case the connection dropped. |
| | | credit = |
| | | currentWindowSemaphore.tryAcquire( |
| | | (long) 500, TimeUnit.MILLISECONDS); |
| | | } |
| | | else |
| | | (long) 500, TimeUnit.MILLISECONDS); |
| | | } else |
| | | { |
| | | credit = true; |
| | | } |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("ReplicationBroker.publish() " + |
| | | "IO exception raised : " + e.getLocalizedMessage()); |
| | | "IO exception raised : " + e.getLocalizedMessage()); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | catch (InterruptedException e) |
| | | } catch (InterruptedException e) |
| | | { |
| | | // just loop. |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("ReplicationBroker.publish() " + |
| | | "Interrupted exception raised." + e.getLocalizedMessage()); |
| | | "Interrupted exception raised." + e.getLocalizedMessage()); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Receive a message. |
| | | * This method is not multithread safe and should either always be |
| | |
| | | { |
| | | WindowMessage windowMsg = (WindowMessage) msg; |
| | | sendWindow.release(windowMsg.getNumAck()); |
| | | } |
| | | else |
| | | } else |
| | | { |
| | | if (msg instanceof UpdateMessage) |
| | | { |
| | |
| | | if (shutdown == false) |
| | | { |
| | | Message message = |
| | | NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer); |
| | | NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer); |
| | | logError(message); |
| | | |
| | | debugInfo("ReplicationBroker.receive() " + baseDn + |
| | | " Exception raised." + e + e.getLocalizedMessage()); |
| | | " Exception raised." + e + e.getLocalizedMessage()); |
| | | this.reStart(failingSession); |
| | | } |
| | | } |
| | |
| | | return null; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * stop the server. |
| | | */ |
| | |
| | | replicationServer = "stopped"; |
| | | shutdown = true; |
| | | connected = false; |
| | | if (heartbeatMonitor!= null) |
| | | if (heartbeatMonitor != null) |
| | | { |
| | | heartbeatMonitor.shutdown(); |
| | | } |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | |
| | | } |
| | | |
| | | if (session != null) |
| | | { |
| | | session.close(); |
| | | } |
| | | } catch (IOException e) |
| | | {} |
| | | { |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | return replicationServer; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public void handleInternalSearchEntry( |
| | | InternalSearchOperation searchOperation, |
| | | SearchResultEntry searchEntry) |
| | | InternalSearchOperation searchOperation, |
| | | SearchResultEntry searchEntry) |
| | | { |
| | | /* |
| | | * Only deal with modify operation so far |
| | |
| | | * {@inheritDoc} |
| | | */ |
| | | public void handleInternalSearchReference( |
| | | InternalSearchOperation searchOperation, |
| | | SearchResultReference searchReference) |
| | | InternalSearchOperation searchOperation, |
| | | SearchResultReference searchReference) |
| | | { |
| | | // TODO to be implemented |
| | | // TODO to be implemented |
| | | } |
| | | |
| | | /** |
| | |
| | | public int getCurrentSendWindow() |
| | | { |
| | | if (connected) |
| | | { |
| | | return sendWindow.availablePermits(); |
| | | else |
| | | } else |
| | | { |
| | | return 0; |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | return numLostConnections; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Change some config parameters. |
| | | * |
| | |
| | | * @param heartbeatInterval The heartbeat interval. |
| | | */ |
| | | public void changeConfig(Collection<String> replicationServers, |
| | | int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, |
| | | int maxSendDelay, int window, long heartbeatInterval) |
| | | int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, |
| | | int maxSendDelay, int window, long heartbeatInterval) |
| | | { |
| | | this.servers = replicationServers; |
| | | this.maxRcvWindow = window; |
| | |
| | | this.maxReceiveQueue = maxReceiveQueue; |
| | | this.maxSendDelay = maxSendDelay; |
| | | this.maxSendQueue = maxSendQueue; |
| | | // TODO : Changing those parameters requires to either restart a new |
| | | // session with the replicationServer or renegociate the parameters that |
| | | // were sent in the ServerStart message |
| | | // TODO : Changing those parameters requires to either restart a new |
| | | // session with the replicationServer or renegociate the parameters that |
| | | // were sent in the ServerStart message |
| | | } |
| | | |
| | | /** |
| | |
| | | return !connectionError; |
| | | } |
| | | |
| | | private boolean debugEnabled() { return true; } |
| | | private boolean debugEnabled() |
| | | { |
| | | return true; |
| | | } |
| | | |
| | | private static final void debugInfo(String s) |
| | | { |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, s)); |