| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | |
| | | ServerHandler handler, |
| | | ReplicationServerDomain replicationServerDomain) |
| | | { |
| | | super("Replication Reader for " + handler.toString() + " in RS " + |
| | | replicationServerDomain.getReplicationServer().getServerId()); |
| | | super("Replication Reader Thread for handler of " + |
| | | handler.toString() + |
| | | " in " + replicationServerDomain); |
| | | this.session = session; |
| | | this.serverId = serverId; |
| | | this.handler = handler; |
| | |
| | | */ |
| | | public void run() |
| | | { |
| | | Message errMessage = null; |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer() ? " RS " : " LS") + |
| | | " reader starting for serverId=" + serverId); |
| | | TRACER.debugInfo(this.getName() + " starting"); |
| | | } |
| | | /* |
| | | * wait on input stream |
| | |
| | | { |
| | | ReplicationMsg msg = session.receive(); |
| | | |
| | | /* |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer()?" From RS ":" From LS")+ |
| | | " with serverId=" + serverId + " receives " + msg); |
| | | TRACER.debugInfo(this.getName() + " receives " + msg); |
| | | } |
| | | */ |
| | | |
| | | if (msg instanceof AckMsg) |
| | | { |
| | | AckMsg ack = (AckMsg) msg; |
| | |
| | | { |
| | | boolean filtered = false; |
| | | /* Ignore updates in some cases */ |
| | | if (handler.isLDAPserver()) |
| | | if (handler.isDataServer()) |
| | | { |
| | | /** |
| | | * Ignore updates from DS in bad BAD_GENID_STATUS or |
| | |
| | | } else if (msg instanceof TopologyMsg) |
| | | { |
| | | TopologyMsg topoMsg = (TopologyMsg) msg; |
| | | replicationServerDomain.receiveTopoInfoFromRS(topoMsg, |
| | | handler, true); |
| | | try |
| | | { |
| | | ReplicationServerHandler rsh = (ReplicationServerHandler)handler; |
| | | replicationServerDomain.receiveTopoInfoFromRS(topoMsg, |
| | | rsh, true); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | errMessage = |
| | | ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get( |
| | | "TopologyMsg", "other"); |
| | | logError(errMessage); |
| | | } |
| | | } else if (msg instanceof ChangeStatusMsg) |
| | | { |
| | | ChangeStatusMsg csMsg = (ChangeStatusMsg) msg; |
| | | replicationServerDomain.processNewStatus(handler, csMsg); |
| | | try |
| | | { |
| | | DataServerHandler dsh = (DataServerHandler)handler; |
| | | replicationServerDomain.processNewStatus(dsh, csMsg); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | errMessage = |
| | | ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get( |
| | | replicationServerDomain.getBaseDn(), |
| | | Short.toString(handler.getServerId()), |
| | | csMsg.toString()); |
| | | logError(errMessage); |
| | | } |
| | | } else if (msg instanceof MonitorRequestMsg) |
| | | { |
| | | MonitorRequestMsg replServerMonitorRequestMsg = |
| | |
| | | * The remote server has sent an unknown message, |
| | | * close the conenction. |
| | | */ |
| | | Message message = NOTE_READER_NULL_MSG.get(handler.toString()); |
| | | logError(message); |
| | | errMessage = NOTE_READER_NULL_MSG.get(handler.toString()); |
| | | logError(errMessage); |
| | | return; |
| | | } |
| | | } catch (NotSupportedOldVersionPDUException e) |
| | |
| | | getMonitorInstanceName() + ":" + e.getMessage()); |
| | | } |
| | | } |
| | | } catch (IOException e) |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | /* |
| | | * The connection has been broken |
| | |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " reader IO EXCEPTION for serverID=" + serverId + |
| | | " reader IO EXCEPTION for serverID=" + serverId + " " + |
| | | this + " " + |
| | | stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage()); |
| | | Message message = NOTE_SERVER_DISCONNECT.get(handler.toString(), |
| | | errMessage = NOTE_SERVER_DISCONNECT.get(handler.toString(), |
| | | Short.toString(replicationServerDomain. |
| | | getReplicationServer().getServerId())); |
| | | logError(message); |
| | | } catch (ClassNotFoundException e) |
| | | logError(errMessage); |
| | | } |
| | | catch (ClassNotFoundException e) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | |
| | | * The remote server has sent an unknown message, |
| | | * close the connection. |
| | | */ |
| | | Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString()); |
| | | logError(message); |
| | | } catch (Exception e) |
| | | errMessage = ERR_UNKNOWN_MESSAGE.get(handler.toString()); |
| | | logError(errMessage); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " server reader EXCEPTION serverID=" + serverId + |
| | | stackTraceToSingleLineString(e)); |
| | | " " + stackTraceToSingleLineString(e)); |
| | | /* |
| | | * The remote server has sent an unknown message, |
| | | * close the connection. |
| | | */ |
| | | Message message = NOTE_READER_EXCEPTION.get(handler.toString()); |
| | | logError(message); |
| | | } finally |
| | | errMessage = NOTE_READER_EXCEPTION.get(handler.toString()); |
| | | logError(errMessage); |
| | | } |
| | | finally |
| | | { |
| | | /* |
| | | * The thread only exit the loop above is some error condition |
| | | * happen. |
| | | * Attempt to close the socket and stop the server handler. |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " server reader for serverID=" + serverId + |
| | | " is closing the session"); |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | this + " is closing the session"); |
| | | session.close(); |
| | | } catch (IOException e) |
| | | { |
| | | // ignore |
| | | } |
| | | replicationServerDomain.stopServer(handler); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(this.getName() + " stopped " + errMessage); |
| | | } |
| | | } |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer() ? " RS" : " LDAP") + |
| | | " server reader stopped for serverID=" + serverId); |
| | | } |
| | | } |