| | |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.messages.*; |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | |
| | | private int maxRcvWindow; |
| | | private int timeout = 0; |
| | | private short protocolVersion; |
| | | private long generationId = -1; |
| | | private ReplSessionSecurity replSessionSecurity; |
| | | |
| | | /** |
| | |
| | | * @param window The size of the send and receive window to use. |
| | | * @param heartbeatInterval The interval between heartbeats requested of the |
| | | * replicationServer, or zero if no heartbeats are requested. |
| | | * |
| | | * @param generationId The generationId for the server associated to the |
| | | * provided serverID and for the domain associated to the provided baseDN. |
| | | * @param replSessionSecurity The session security configuration. |
| | | */ |
| | | public ReplicationBroker(ServerState state, DN baseDn, short serverID, |
| | | int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, |
| | | int maxSendDelay, int window, long heartbeatInterval, |
| | | ReplSessionSecurity replSessionSecurity) |
| | | long generationId, ReplSessionSecurity replSessionSecurity) |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.serverID = serverID; |
| | |
| | | this.halfRcvWindow = window/2; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.protocolVersion = ProtocolVersion.currentVersion(); |
| | | this.generationId = generationId; |
| | | this.replSessionSecurity = replSessionSecurity; |
| | | } |
| | | |
| | |
| | | */ |
| | | private void connect() |
| | | { |
| | | ReplServerStartMessage startMsg; |
| | | ReplServerStartMessage replServerStartMsg; |
| | | |
| | | // Stop any existing heartbeat monitor from a previous session. |
| | | if (heartbeatMonitor != null) |
| | |
| | | heartbeatMonitor = null; |
| | | } |
| | | |
| | | // checkState is true for the first loop on all replication servers |
| | | // looking for one already up-to-date. |
| | | // If we found some responding replication servers but none up-to-date |
| | | // then we set check-state to false and do a second loop where the first |
| | | // found will be the one elected and then we will update this replication |
| | | // server. |
| | | boolean checkState = true; |
| | | boolean receivedResponse = true; |
| | | |
| | | // TODO: We are doing here 2 loops opening , closing , reopening session to |
| | | // the same servers .. risk to have 'same server id' erros. |
| | | // Would be better to do only one loop, keeping the best candidate while |
| | | // traversing the list of replication servers to connect to. |
| | | if (servers.size()==1) |
| | | { |
| | | checkState = false; |
| | | } |
| | | |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | while ((!connected) && (!shutdown) && (receivedResponse)) |
| | |
| | | ServerStartMessage msg = new ServerStartMessage(serverID, baseDn, |
| | | maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, |
| | | halfRcvWindow*2, heartbeatInterval, state, |
| | | protocolVersion, isSslEncryption); |
| | | protocolVersion, generationId, isSslEncryption); |
| | | session.publish(msg); |
| | | |
| | | |
| | |
| | | * Read the ReplServerStartMessage that should come back. |
| | | */ |
| | | session.setSoTimeout(1000); |
| | | startMsg = (ReplServerStartMessage) session.receive(); |
| | | replServerStartMsg = (ReplServerStartMessage) session.receive(); |
| | | receivedResponse = true; |
| | | |
| | | /* |
| | |
| | | * if it is an old replication server). |
| | | */ |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | startMsg.getVersion()); |
| | | replServerStartMsg.getVersion()); |
| | | session.setSoTimeout(timeout); |
| | | |
| | | if (!isSslEncryption) |
| | |
| | | * those changes and send them again to any replicationServer. |
| | | */ |
| | | ChangeNumber replServerMaxChangeNumber = |
| | | startMsg.getServerState().getMaxChangeNumber(serverID); |
| | | replServerStartMsg.getServerState().getMaxChangeNumber(serverID); |
| | | if (replServerMaxChangeNumber == null) |
| | | replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID); |
| | | ChangeNumber ourMaxChangeNumber = |
| | |
| | | (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) |
| | | { |
| | | replicationServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | maxSendWindow = replServerStartMsg.getWindowSize(); |
| | | connected = true; |
| | | startHeartBeat(); |
| | | break; |
| | |
| | | * of our changes, we are going to try another server |
| | | * but before log a notice message |
| | | */ |
| | | Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server); |
| | | Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server, |
| | | baseDn.toNormalizedString()); |
| | | logError(message); |
| | | } |
| | | else |
| | |
| | | else |
| | | { |
| | | replicationServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | maxSendWindow = replServerStartMsg.getWindowSize(); |
| | | connected = true; |
| | | for (FakeOperation replayOp : replayOperations) |
| | | { |
| | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | Message message = |
| | | ERR_EXCEPTION_STARTING_SESSION.get(e.getMessage()); |
| | | Message message = NOTE_EXCEPTION_STARTING_SESSION.get( |
| | | baseDn.toNormalizedString(), server, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } |
| | | finally |
| | |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } // for servers |
| | | |
| | | // We have traversed all the replication servers |
| | | |
| | | if ((!connected) && (checkState == true) && receivedResponse) |
| | | { |
| | |
| | | * changes that this server has already processed, start again |
| | | * the loop looking for any replicationServer. |
| | | */ |
| | | Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(); |
| | | Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get( |
| | | baseDn.toNormalizedString()); |
| | | logError(message); |
| | | checkState = false; |
| | | } |
| | | } |
| | | |
| | | // We have traversed all the replication servers as many times as needed |
| | | // to find one if one is up and running. |
| | | |
| | | if (connected) |
| | | { |
| | | // This server has connected correctly. |
| | |
| | | /** |
| | | * restart the ReplicationBroker. |
| | | */ |
| | | private void reStart() |
| | | public void reStart() |
| | | { |
| | | reStart(null); |
| | | reStart(this.session); |
| | | } |
| | | |
| | | /** |
| | |
| | | * |
| | | * @param failingSession the socket which failed |
| | | */ |
| | | private void reStart(ProtocolSession failingSession) |
| | | public void reStart(ProtocolSession failingSession) |
| | | { |
| | | try |
| | | { |
| | |
| | | } catch (Exception e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_EXCEPTION_STARTING_SESSION.get(e.getMessage())); |
| | | mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get( |
| | | baseDn.toNormalizedString(), e.getLocalizedMessage())); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | } |
| | |
| | | // choice than to return without sending the ReplicationMessage |
| | | // and relying on the resend procedure of the connect phase to |
| | | // fix the problem when we finally connect. |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("ReplicationBroker.publish() Publishing a " + |
| | | " message is not possible due to existing connection error."); |
| | | } |
| | | |
| | | return; |
| | | } |
| | | |
| | |
| | | } catch (InterruptedException e1) |
| | | { |
| | | // ignore |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("ReplicationBroker.publish() " + |
| | | "IO exception raised : " + e.getLocalizedMessage()); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // just loop. |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("ReplicationBroker.publish() " + |
| | | "Interrupted exception raised." + e.getLocalizedMessage()); |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | if (!connected) |
| | | { |
| | | reStart(); |
| | | reStart(null); |
| | | } |
| | | |
| | | ProtocolSession failingSession = session; |
| | |
| | | Message message = |
| | | NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer); |
| | | logError(message); |
| | | |
| | | debugInfo("ReplicationBroker.receive() " + baseDn + |
| | | " Exception raised." + e + e.getLocalizedMessage()); |
| | | this.reStart(failingSession); |
| | | } |
| | | } |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("ReplicationBroker Stop Closing session"); |
| | | debugInfo("ReplicationBroker is stopping. and will" + |
| | | "close the connection"); |
| | | } |
| | | |
| | | if (session != null) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Set the value of the generationId for that broker. Normally the |
| | | * generationId is set through the constructor but there are cases |
| | | * where the value of the generationId must be changed while the broker |
| | | * already exist for example after an on-line import. |
| | | * |
| | | * @param generationId The value of the generationId. |
| | | * |
| | | */ |
| | | public void setGenerationId(long generationId) |
| | | { |
| | | this.generationId = generationId; |
| | | } |
| | | |
| | | /** |
| | | * Get the name of the replicationServer to which this broker is currently |
| | | * connected. |
| | | * |
| | |
| | | return !connectionError; |
| | | } |
| | | |
| | | private boolean debugEnabled() { return true; } |
| | | private static final void debugInfo(String s) |
| | | { |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, s)); |
| | | TRACER.debugInfo(s); |
| | | } |
| | | |
| | | /** |
| | | * Determine whether the connection to the replication server is encrypted. |
| | | * @return true if the connection is encrypted, false otherwise. |