| | |
| | | import java.util.LinkedHashSet; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | import org.opends.server.protocols.asn1.ASN1OctetString; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | |
| | | import org.opends.server.replication.protocol.SocketSession; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.replication.protocol.WindowMessage; |
| | | import org.opends.server.replication.protocol.WindowProbe; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DereferencePolicy; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | |
| | | private boolean shutdown = false; |
| | | private Collection<String> servers; |
| | | private boolean connected = false; |
| | | private final Object lock = new Object(); |
| | | private String replicationServer = "Not connected"; |
| | | private TreeSet<FakeOperation> replayOperations; |
| | | private ProtocolSession session = null; |
| | |
| | | private int numLostConnections = 0; |
| | | |
| | | /** |
| | | * When the broker cannort connect to any replication server |
| | | * When the broker cannot connect to any replication server |
| | | * it log an error and keeps continuing every second. |
| | | * This boolean is set when the first failure happens and is used |
| | | * to avoid repeating the error message for further failure to connect |
| | |
| | | */ |
| | | private boolean connectionError = false; |
| | | |
| | | private Object connectPhaseLock = new Object(); |
| | | |
| | | /** |
| | | * Creates a new ReplicationServer Broker for a particular ReplicationDomain. |
| | | * |
| | |
| | | |
| | | boolean checkState = true; |
| | | boolean receivedResponse = true; |
| | | while ((!connected) && (!shutdown) && (receivedResponse)) |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | receivedResponse = false; |
| | | for (String server : servers) |
| | | while ((!connected) && (!shutdown) && (receivedResponse)) |
| | | { |
| | | int separator = server.lastIndexOf(':'); |
| | | String port = server.substring(separator + 1); |
| | | String hostname = server.substring(0, separator); |
| | | |
| | | try |
| | | receivedResponse = false; |
| | | for (String server : servers) |
| | | { |
| | | /* |
| | | * Open a socket connection to the next candidate. |
| | | */ |
| | | InetSocketAddress ServerAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), Integer.parseInt(port)); |
| | | Socket socket = new Socket(); |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.setTcpNoDelay(true); |
| | | socket.connect(ServerAddr, 500); |
| | | session = new SocketSession(socket); |
| | | int separator = server.lastIndexOf(':'); |
| | | String port = server.substring(separator + 1); |
| | | String hostname = server.substring(0, separator); |
| | | |
| | | /* |
| | | * Send our ServerStartMessage. |
| | | */ |
| | | ServerStartMessage msg = new ServerStartMessage(serverID, baseDn, |
| | | maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, |
| | | halfRcvWindow*2, heartbeatInterval, state, |
| | | protocolVersion); |
| | | session.publish(msg); |
| | | |
| | | |
| | | /* |
| | | * Read the ReplServerStartMessage that should come back. |
| | | */ |
| | | session.setSoTimeout(1000); |
| | | startMsg = (ReplServerStartMessage) session.receive(); |
| | | receivedResponse = true; |
| | | |
| | | /* |
| | | * We have sent our own protocol version to the replication server. |
| | | * The replication server will use the same one (or an older one |
| | | * if it is an old replication server). |
| | | */ |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | startMsg.getVersion()); |
| | | session.setSoTimeout(timeout); |
| | | |
| | | /* |
| | | * We must not publish changes to a replicationServer that has not |
| | | * seen all our previous changes because this could cause some |
| | | * other ldap servers to miss those changes. |
| | | * Check that the ReplicationServer has seen all our previous changes. |
| | | * If not, try another replicationServer. |
| | | * If no other replicationServer has seen all our changes, recover |
| | | * those changes and send them again to any replicationServer. |
| | | */ |
| | | ChangeNumber replServerMaxChangeNumber = |
| | | startMsg.getServerState().getMaxChangeNumber(serverID); |
| | | if (replServerMaxChangeNumber == null) |
| | | replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID); |
| | | ChangeNumber ourMaxChangeNumber = state.getMaxChangeNumber(serverID); |
| | | if ((ourMaxChangeNumber == null) || |
| | | (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) |
| | | try |
| | | { |
| | | replicationServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | this.sendWindow = new Semaphore(maxSendWindow); |
| | | connected = true; |
| | | startHeartBeat(); |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | if (checkState == true) |
| | | /* |
| | | * Open a socket connection to the next candidate. |
| | | */ |
| | | InetSocketAddress ServerAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), Integer.parseInt(port)); |
| | | Socket socket = new Socket(); |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.setTcpNoDelay(true); |
| | | socket.connect(ServerAddr, 500); |
| | | session = new SocketSession(socket); |
| | | |
| | | /* |
| | | * Send our ServerStartMessage. |
| | | */ |
| | | ServerStartMessage msg = new ServerStartMessage(serverID, baseDn, |
| | | maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, |
| | | halfRcvWindow*2, heartbeatInterval, state, |
| | | protocolVersion); |
| | | session.publish(msg); |
| | | |
| | | |
| | | /* |
| | | * Read the ReplServerStartMessage that should come back. |
| | | */ |
| | | session.setSoTimeout(1000); |
| | | startMsg = (ReplServerStartMessage) session.receive(); |
| | | receivedResponse = true; |
| | | |
| | | /* |
| | | * We have sent our own protocol version to the replication server. |
| | | * The replication server will use the same one (or an older one |
| | | * if it is an old replication server). |
| | | */ |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | startMsg.getVersion()); |
| | | session.setSoTimeout(timeout); |
| | | |
| | | /* |
| | | * We must not publish changes to a replicationServer that has not |
| | | * seen all our previous changes because this could cause some |
| | | * other ldap servers to miss those changes. |
| | | * Check that the ReplicationServer has seen all our previous |
| | | * changes. |
| | | * If not, try another replicationServer. |
| | | * If no other replicationServer has seen all our changes, recover |
| | | * those changes and send them again to any replicationServer. |
| | | */ |
| | | ChangeNumber replServerMaxChangeNumber = |
| | | startMsg.getServerState().getMaxChangeNumber(serverID); |
| | | if (replServerMaxChangeNumber == null) |
| | | replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID); |
| | | ChangeNumber ourMaxChangeNumber = |
| | | state.getMaxChangeNumber(serverID); |
| | | if ((ourMaxChangeNumber == null) || |
| | | (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) |
| | | { |
| | | /* This replicationServer is missing some |
| | | * of our changes, we are going to try another server |
| | | * but before log a notice message |
| | | */ |
| | | int msgID = MSGID_CHANGELOG_MISSING_CHANGES; |
| | | String message = getMessage(msgID, server); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | replicationServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | connected = true; |
| | | startHeartBeat(); |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | replayOperations.clear(); |
| | | /* |
| | | * Get all the changes that have not been seen by this |
| | | * replicationServer and update it |
| | | */ |
| | | InternalClientConnection conn = |
| | | InternalClientConnection.getRootConnection(); |
| | | LDAPFilter filter = LDAPFilter.decode( |
| | | "("+ Historical.HISTORICALATTRIBUTENAME + |
| | | ">=dummy:" + replServerMaxChangeNumber + ")"); |
| | | LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); |
| | | attrs.add(Historical.HISTORICALATTRIBUTENAME); |
| | | InternalSearchOperation op = conn.processSearch( |
| | | new ASN1OctetString(baseDn.toString()), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | DereferencePolicy.NEVER_DEREF_ALIASES, |
| | | 0, 0, false, filter, |
| | | attrs, this); |
| | | if (op.getResultCode() != ResultCode.SUCCESS) |
| | | if (checkState == true) |
| | | { |
| | | /* |
| | | * An error happened trying to search for the updates |
| | | * This server therefore can't start acepting new updates. |
| | | * TODO : should stop the LDAP server (how to ?) |
| | | /* This replicationServer is missing some |
| | | * of our changes, we are going to try another server |
| | | * but before log a notice message |
| | | */ |
| | | int msgID = MSGID_CANNOT_RECOVER_CHANGES; |
| | | String message = getMessage(msgID); |
| | | int msgID = MSGID_CHANGELOG_MISSING_CHANGES; |
| | | String message = getMessage(msgID, server); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.FATAL_ERROR, |
| | | message, msgID); |
| | | ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | else |
| | | { |
| | | replicationServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | this.sendWindow = new Semaphore(maxSendWindow); |
| | | connected = true; |
| | | for (FakeOperation replayOp : replayOperations) |
| | | replayOperations.clear(); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "going to search for changes", 1); |
| | | /* |
| | | * Get all the changes that have not been seen by this |
| | | * replicationServer and update it |
| | | */ |
| | | InternalClientConnection conn = |
| | | InternalClientConnection.getRootConnection(); |
| | | LDAPFilter filter = LDAPFilter.decode( |
| | | "("+ Historical.HISTORICALATTRIBUTENAME + |
| | | ">=dummy:" + replServerMaxChangeNumber + ")"); |
| | | LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); |
| | | attrs.add(Historical.HISTORICALATTRIBUTENAME); |
| | | InternalSearchOperation op = conn.processSearch( |
| | | new ASN1OctetString(baseDn.toString()), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | DereferencePolicy.NEVER_DEREF_ALIASES, |
| | | 0, 0, false, filter, |
| | | attrs, this); |
| | | if (op.getResultCode() != ResultCode.SUCCESS) |
| | | { |
| | | publish(replayOp.generateMessage()); |
| | | /* |
| | | * An error happened trying to search for the updates |
| | | * This server will start acepting again new updates but |
| | | * some inconsistencies will stay between servers. |
| | | * TODO : REPAIR : log an error for the repair tool |
| | | * that will need to resynchronize the servers. |
| | | */ |
| | | int msgID = MSGID_CANNOT_RECOVER_CHANGES; |
| | | String message = getMessage(msgID); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.FATAL_ERROR, |
| | | message, msgID); |
| | | } |
| | | startHeartBeat(); |
| | | break; |
| | | else |
| | | { |
| | | replicationServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | connected = true; |
| | | for (FakeOperation replayOp : replayOperations) |
| | | { |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "sendingChange", 1); |
| | | session.publish(replayOp.generateMessage()); |
| | | } |
| | | startHeartBeat(); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "changes sent", 1); |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | catch (ConnectException e) |
| | | { |
| | | /* |
| | | * There was no server waiting on this host:port |
| | | * Log a notice and try the next replicationServer in the list |
| | | */ |
| | | if (!connectionError ) |
| | | { |
| | | // the error message is only logged once to avoid overflowing |
| | | // the error log |
| | | int msgID = MSGID_NO_CHANGELOG_SERVER_LISTENING; |
| | | String message = getMessage(msgID, server); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | int msgID = MSGID_EXCEPTION_STARTING_SESSION; |
| | | String message = getMessage(msgID); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message + stackTraceToSingleLineString(e), msgID); |
| | | } |
| | | finally |
| | | { |
| | | if (connected == false) |
| | | { |
| | | if (session != null) |
| | | { |
| | | session.close(); |
| | | session = null; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | catch (ConnectException e) |
| | | |
| | | if ((!connected) && (checkState == true) && receivedResponse) |
| | | { |
| | | /* |
| | | * There was no server waiting on this host:port |
| | | * Log a notice and try the next replicationServer in the list |
| | | * We could not find a replicationServer that has seen all the |
| | | * changes that this server has already processed, start again |
| | | * the loop looking for any replicationServer. |
| | | */ |
| | | if (!connectionError ) |
| | | { |
| | | // the error message is only logged once to avoid overflowing |
| | | // the error log |
| | | int msgID = MSGID_NO_CHANGELOG_SERVER_LISTENING; |
| | | String message = getMessage(msgID, server); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | int msgID = MSGID_EXCEPTION_STARTING_SESSION; |
| | | String message = getMessage(msgID) + stackTraceToSingleLineString(e); |
| | | int msgID = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES; |
| | | String message = getMessage(msgID); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | } |
| | | finally |
| | | { |
| | | if (connected == false) |
| | | { |
| | | if (session != null) |
| | | { |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "Broker : connect closing session" , 1); |
| | | |
| | | session.close(); |
| | | session = null; |
| | | } |
| | | } |
| | | ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | checkState = false; |
| | | } |
| | | } |
| | | |
| | | if ((!connected) && (checkState == true) && receivedResponse) |
| | | if (connected) |
| | | { |
| | | // This server has connected correctly. |
| | | // Log a message to let the administrator know that the failure was |
| | | // resolved. |
| | | // wakeup all the thread that were waiting on the window |
| | | // on the previous connection. |
| | | connectionError = false; |
| | | if (sendWindow != null) |
| | | sendWindow.release(Integer.MAX_VALUE); |
| | | this.sendWindow = new Semaphore(maxSendWindow); |
| | | connectPhaseLock.notify(); |
| | | int msgID = MSGID_NOW_FOUND_CHANGELOG; |
| | | String message = |
| | | getMessage(msgID, replicationServer, baseDn.toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, message, msgID); |
| | | } |
| | | else |
| | | { |
| | | /* |
| | | * We could not find a replicationServer that has seen all the |
| | | * changes that this server has already processed, start again |
| | | * the loop looking for any replicationServer. |
| | | * This server could not find any replicationServer |
| | | * It's going to start in degraded mode. |
| | | * Log a message |
| | | */ |
| | | int msgID = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES; |
| | | String message = getMessage(msgID); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | try |
| | | if (!connectionError) |
| | | { |
| | | Thread.sleep(500); |
| | | } catch (InterruptedException e) |
| | | { |
| | | checkState = false; |
| | | connectionError = true; |
| | | connectPhaseLock.notify(); |
| | | int msgID = MSGID_COULD_NOT_FIND_CHANGELOG; |
| | | String message = getMessage(msgID, baseDn.toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, message, msgID); |
| | | } |
| | | checkState = false; |
| | | } |
| | | } |
| | | |
| | | if (connected) |
| | | { |
| | | // This server has connected correctly. |
| | | // let's check if it was previosuly on error, in this case log |
| | | // a message to let the administratot know that the failure was resolved. |
| | | if (connectionError) |
| | | { |
| | | connectionError = false; |
| | | int msgID = MSGID_NOW_FOUND_CHANGELOG; |
| | | String message = getMessage(msgID, baseDn.toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, message, msgID); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | /* |
| | | * This server could not find any replicationServer |
| | | * It's going to start in degraded mode. |
| | | * Log a message |
| | | */ |
| | | if (!connectionError) |
| | | { |
| | | checkState = false; |
| | | connectionError = true; |
| | | int msgID = MSGID_COULD_NOT_FIND_CHANGELOG; |
| | | String message = getMessage(msgID, baseDn.toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, message, msgID); |
| | | } |
| | | } |
| | | } |
| | |
| | | public void publish(ReplicationMessage msg) |
| | | { |
| | | boolean done = false; |
| | | ProtocolSession failingSession = session; |
| | | |
| | | while (!done) |
| | | { |
| | | if (connectionError) |
| | | return; |
| | | synchronized (lock) |
| | | { |
| | | try |
| | | // It was not possible to connect to any replication server. |
| | | // Since the operation was already processed, we have no other |
| | | // choice than to return without sending the ReplicationMessage |
| | | // and relying on the resend procedure of the connect phase to |
| | | // fix the problem when we finally connect. |
| | | return; |
| | | } |
| | | |
| | | try |
| | | { |
| | | boolean credit; |
| | | ProtocolSession current_session; |
| | | Semaphore currentWindowSemaphore; |
| | | |
| | | // save the session at the time when we acquire the |
| | | // sendwindow credit so that we can make sure later |
| | | // that the session did not change in between. |
| | | // This is necessary to make sure that we don't publish a message |
| | | // on a session with a credit that was acquired from a previous |
| | | // session. |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | if (this.connected == false) |
| | | this.reStart(failingSession); |
| | | if (msg instanceof UpdateMessage) |
| | | sendWindow.acquire(); |
| | | session.publish(msg); |
| | | done = true; |
| | | } catch (IOException e) |
| | | { |
| | | this.reStart(failingSession); |
| | | current_session = session; |
| | | currentWindowSemaphore = sendWindow; |
| | | } |
| | | catch (InterruptedException e) |
| | | |
| | | if (msg instanceof UpdateMessage) |
| | | { |
| | | this.reStart(failingSession); |
| | | // Acquiring the window credit must be done outside of the |
| | | // connectPhaseLock because it can be blocking and we don't |
| | | // want to hold off reconnection in case the connection dropped. |
| | | credit = |
| | | currentWindowSemaphore.tryAcquire( |
| | | (long) 500, TimeUnit.MILLISECONDS); |
| | | } |
| | | else |
| | | { |
| | | credit = true; |
| | | } |
| | | if (credit) |
| | | { |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | // check the session. If it has changed, some |
| | | // deconnection/reconnection happened and we need to restart from |
| | | // scratch. |
| | | if (session == current_session) |
| | | { |
| | | session.publish(msg); |
| | | done = true; |
| | | } |
| | | } |
| | | } |
| | | if (!credit) |
| | | { |
| | | // the window is still closed. |
| | | // Send a WindowProbe message to wakeup the receiver in case the |
| | | // window update message was lost somehow... |
| | | // then loop to check again if connection was closed. |
| | | session.publish(new WindowProbe()); |
| | | } |
| | | } catch (IOException e) |
| | | { |
| | | // The receive threads should handle reconnection or |
| | | // mark this broker in error. Just retry. |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | try |
| | | { |
| | | connectPhaseLock.wait(100); |
| | | } catch (InterruptedException e1) |
| | | { |
| | | // ignore |
| | | } |
| | | } |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // just loop. |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | /** |
| | | * Receive a message. |
| | | * This method is not multithread safe and should either always be |
| | | * called in a single thread or protected by a locking mechanism |
| | | * before being called. |
| | | * |
| | | * @return the received message |
| | | * @throws SocketTimeoutException if the timeout set by setSoTimeout |
| | | * has expired |
| | |
| | | { |
| | | if (shutdown == false) |
| | | { |
| | | synchronized (lock) |
| | | { |
| | | this.reStart(failingSession); |
| | | } |
| | | int msgID = MSGID_DISCONNECTED_FROM_CHANGELOG; |
| | | String message = getMessage(msgID, replicationServer); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | message + " " + e.getMessage(), msgID); |
| | | this.reStart(failingSession); |
| | | } |
| | | } |
| | | } |