| | |
| | | |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | |
| | | |
| | | /** |
| | |
| | | private ServerHandler handler; |
| | | private ReplicationServerDomain replicationServerDomain; |
| | | private short serverId; |
| | | private short protocolVersion = -1; |
| | | |
| | | /** |
| | | * Create a ServerWriter. |
| | |
| | | ServerHandler handler, |
| | | ReplicationServerDomain replicationServerDomain) |
| | | { |
| | | super(handler.toString() + " writer"); |
| | | super("Replication Writer for " + handler.toString() + " in RS " + |
| | | replicationServerDomain.getReplicationServer().getServerId()); |
| | | |
| | | this.serverId = serverId; |
| | | this.session = session; |
| | | this.handler = handler; |
| | | this.replicationServerDomain = replicationServerDomain; |
| | | // Keep protocol version locally for efficiency |
| | | this.protocolVersion = handler.getProtocolVersion(); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | while (true) |
| | | { |
| | | UpdateMessage update = replicationServerDomain.take(this.handler); |
| | | UpdateMsg update = replicationServerDomain.take(this.handler); |
| | | if (update == null) |
| | | return; /* this connection is closing */ |
| | | |
| | | // Ignore update to be sent to a replica with a bad generation ID |
| | | long referenceGenerationId = replicationServerDomain.getGenerationId(); |
| | | if ((referenceGenerationId != handler.getGenerationId()) |
| | | || (referenceGenerationId == -1) |
| | | || (handler.getGenerationId() == -1)) |
| | | /* Ignore updates in some cases */ |
| | | if (handler.isLDAPserver()) |
| | | { |
| | | logError(ERR_IGNORING_UPDATE_TO.get( |
| | | this.replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName(), |
| | | update.getDn(), |
| | | this.handler.getMonitorInstanceName(), |
| | | /** |
| | | * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS |
| | | * |
| | | * The RSD lock should not be taken here as it is acceptable to have a |
| | | * delay between the time the server has a wrong status and the fact |
| | | * we detect it: the updates that succeed to pass during this time |
| | | * will have no impact on remote server. But it is interesting to not |
| | | * saturate uselessly the network if the updates are not necessary so |
| | | * this check to stop sending updates is interesting anyway. Not |
| | | * taking the RSD lock allows to have better performances in normal |
| | | * mode (most of the time). |
| | | */ |
| | | ServerStatus dsStatus = handler.getStatus(); |
| | | if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) || |
| | | (dsStatus == ServerStatus.FULL_UPDATE_STATUS)) |
| | | { |
| | | long referenceGenerationId = |
| | | replicationServerDomain.getGenerationId(); |
| | | if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) |
| | | logError(ERR_IGNORING_UPDATE_TO_DS_BADGENID.get( |
| | | Short.toString(replicationServerDomain.getReplicationServer(). |
| | | getServerId()), |
| | | replicationServerDomain.getBaseDn().toNormalizedString(), |
| | | update.getChangeNumber().toString(), |
| | | Short.toString(handler.getServerId()), |
| | | Long.toString(handler.getGenerationId()), |
| | | Long.toString(referenceGenerationId))); |
| | | if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | | logError(ERR_IGNORING_UPDATE_TO_DS_FULLUP.get( |
| | | Short.toString(replicationServerDomain.getReplicationServer(). |
| | | getServerId()), |
| | | replicationServerDomain.getBaseDn().toNormalizedString(), |
| | | update.getChangeNumber().toString(), |
| | | Short.toString(handler.getServerId()))); |
| | | continue; |
| | | } |
| | | } else |
| | | { |
| | | /** |
| | | * Ignore updates to RS with bad gen id |
| | | * (no system managed status for a RS) |
| | | */ |
| | | long referenceGenerationId = |
| | | replicationServerDomain.getGenerationId(); |
| | | if ((referenceGenerationId != handler.getGenerationId()) || |
| | | (referenceGenerationId == -1) || (handler.getGenerationId() == -1)) |
| | | { |
| | | logError(ERR_IGNORING_UPDATE_TO_RS.get( |
| | | Short.toString(replicationServerDomain.getReplicationServer(). |
| | | getServerId()), |
| | | replicationServerDomain.getBaseDn().toNormalizedString(), |
| | | update.getChangeNumber().toString(), |
| | | Short.toString(handler.getServerId()), |
| | | Long.toString(handler.getGenerationId()), |
| | | Long.toString(referenceGenerationId))); |
| | | continue; |
| | | continue; |
| | | } |
| | | } |
| | | |
| | | /* |
| | |
| | | " generationId=" + handler.getGenerationId()); |
| | | } |
| | | */ |
| | | session.publish(update); |
| | | |
| | | // Publish the update to the remote server using a protocol version he |
| | | // it supports |
| | | short pduProtocolVersion = update.getVersion(); |
| | | if (protocolVersion < pduProtocolVersion) |
| | | { // The remote server wants a lower protocol version than the PDU one, |
| | | // send it the PDU, serializing it with the supported older version |
| | | session.publish(update, protocolVersion); |
| | | } else { |
| | | session.publish(update); |
| | | } |
| | | } |
| | | } |
| | | catch (NoSuchElementException e) |
| | |
| | | * The remote host has disconnected and this particular Tree is going to |
| | | * be removed, just ignore the exception and let the thread die as well |
| | | */ |
| | | Message message = NOTE_SERVER_DISCONNECT.get(handler.toString()); |
| | | Message message = NOTE_SERVER_DISCONNECT.get(handler.toString(), |
| | | Short.toString(replicationServerDomain. |
| | | getReplicationServer().getServerId())); |
| | | logError(message); |
| | | } |
| | | catch (SocketException e) |
| | |
| | | * The remote host has disconnected and this particular Tree is going to |
| | | * be removed, just ignore the exception and let the thread die as well |
| | | */ |
| | | Message message = NOTE_SERVER_DISCONNECT.get(handler.toString()); |
| | | Message message = NOTE_SERVER_DISCONNECT.get(handler.toString(), |
| | | Short.toString(replicationServerDomain. |
| | | getReplicationServer().getServerId())); |
| | | logError(message); |
| | | } |
| | | catch (Exception e) |