| | |
| | | { |
| | | try |
| | | { |
| | | // Acquire lock on domain (see more details in comment of start() |
| | | // method of ServerHandler) |
| | | lock(); |
| | | } catch (InterruptedException ex) |
| | | { |
| | | // Try doing job anyway... |
| | | try |
| | | { |
| | | // Acquire lock on domain (see more details in comment of start() |
| | | // method of ServerHandler) |
| | | lock(); |
| | | } catch (InterruptedException ex) |
| | | { |
| | | // Try doing job anyway... |
| | | } |
| | | if (otherHandlers.contains(handler)) |
| | | { |
| | | unRegisterHandler(handler); |
| | | handler.shutdown(); |
| | | } |
| | | } |
| | | if (otherHandlers.contains(handler)) |
| | | catch(Exception e) |
| | | { |
| | | unRegisterHandler(handler); |
| | | handler.shutdown(); |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, |
| | | stackTraceToSingleLineString(e))); |
| | | } |
| | | finally |
| | | { |
| | | release(); |
| | | } |
| | | } |
| | | release(); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this + |
| | | " Receiving ResetGenerationIdMsg from " + senderHandler.getServerId() + |
| | | " for baseDn " + baseDn + ":\n" + genIdMsg); |
| | | |
| | | "In " + this + |
| | | " Receiving ResetGenerationIdMsg from " + senderHandler.getServerId()+ |
| | | " for baseDn " + baseDn + ":\n" + genIdMsg); |
| | | try |
| | | { |
| | | // Acquire lock on domain (see more details in comment of start() method |
| | | // of ServerHandler) |
| | | lock(); |
| | | } catch (InterruptedException ex) |
| | | { |
| | | // Try doing job anyway... |
| | | } |
| | | |
| | | long newGenId = genIdMsg.getGenerationId(); |
| | | |
| | | if (newGenId != this.generationId) |
| | | { |
| | | changeGenerationId(newGenId, false); |
| | | } |
| | | else |
| | | { |
| | | // Order to take a gen id we already have, just ignore |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this |
| | | + " Reset generation id requested for baseDn " + baseDn |
| | | + " but generation id was already " + this.generationId |
| | | + ":\n" + genIdMsg); |
| | | } |
| | | |
| | | // If we are the first replication server warned, |
| | | // then forwards the reset message to the remote replication servers |
| | | for (ServerHandler rsHandler : replicationServers.values()) |
| | | { |
| | | try |
| | | { |
| | | // After we'll have sent the message , the remote RS will adopt |
| | | // the new genId |
| | | rsHandler.setGenerationId(newGenId); |
| | | if (senderHandler.isDataServer()) |
| | | // Acquire lock on domain (see more details in comment of start() method |
| | | // of ServerHandler) |
| | | lock(); |
| | | } catch (InterruptedException ex) |
| | | { |
| | | // Try doing job anyway... |
| | | } |
| | | |
| | | long newGenId = genIdMsg.getGenerationId(); |
| | | |
| | | if (newGenId != this.generationId) |
| | | { |
| | | changeGenerationId(newGenId, false); |
| | | } |
| | | else |
| | | { |
| | | // Order to take a gen id we already have, just ignore |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this |
| | | + " Reset generation id requested for baseDn " + baseDn |
| | | + " but generation id was already " + this.generationId |
| | | + ":\n" + genIdMsg); |
| | | } |
| | | |
| | | // If we are the first replication server warned, |
| | | // then forwards the reset message to the remote replication servers |
| | | for (ServerHandler rsHandler : replicationServers.values()) |
| | | { |
| | | try |
| | | { |
| | | rsHandler.send(genIdMsg); |
| | | // After we'll have sent the message , the remote RS will adopt |
| | | // the new genId |
| | | rsHandler.setGenerationId(newGenId); |
| | | if (senderHandler.isDataServer()) |
| | | { |
| | | rsHandler.send(genIdMsg); |
| | | } |
| | | } catch (IOException e) |
| | | { |
| | | logError(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID.get(baseDn.toString(), |
| | | e.getMessage())); |
| | | } |
| | | } catch (IOException e) |
| | | { |
| | | logError(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID.get(baseDn.toString(), |
| | | e.getMessage())); |
| | | } |
| | | } |
| | | |
| | | // Change status of the connected DSs according to the requested new |
| | | // reference generation id |
| | | for (DataServerHandler dsHandler : directoryServers.values()) |
| | | // Change status of the connected DSs according to the requested new |
| | | // reference generation id |
| | | for (DataServerHandler dsHandler : directoryServers.values()) |
| | | { |
| | | try |
| | | { |
| | | dsHandler.changeStatusForResetGenId(newGenId); |
| | | } catch (IOException e) |
| | | { |
| | | logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get(baseDn. |
| | | toString(), |
| | | Integer.toString(dsHandler.getServerId()), |
| | | e.getMessage())); |
| | | } |
| | | } |
| | | |
| | | // Update every peers (RS/DS) with potential topology changes (status |
| | | // change). Rather than doing that each time a DS has a status change |
| | | // (consecutive to reset gen id message), we prefer advertising once for |
| | | // all after changes (less packet sent), here at the end of the reset msg |
| | | // treatment. |
| | | buildAndSendTopoInfoToDSs(null); |
| | | buildAndSendTopoInfoToRSs(); |
| | | |
| | | Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString(), |
| | | Long.toString(newGenId)); |
| | | logError(message); |
| | | |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | try |
| | | { |
| | | dsHandler.changeStatusForResetGenId(newGenId); |
| | | } catch (IOException e) |
| | | { |
| | | logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get(baseDn. |
| | | toString(), |
| | | Integer.toString(dsHandler.getServerId()), |
| | | e.getMessage())); |
| | | } |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, |
| | | stackTraceToSingleLineString(e))); |
| | | } |
| | | |
| | | // Update every peers (RS/DS) with potential topology changes (status |
| | | // change). Rather than doing that each time a DS has a status change |
| | | // (consecutive to reset gen id message), we prefer advertising once for |
| | | // all after changes (less packet sent), here at the end of the reset msg |
| | | // treatment. |
| | | buildAndSendTopoInfoToDSs(null); |
| | | buildAndSendTopoInfoToRSs(); |
| | | |
| | | Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString(), |
| | | Long.toString(newGenId)); |
| | | logError(message); |
| | | |
| | | release(); |
| | | finally |
| | | { |
| | | release(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + getReplicationServer().getServerId() + |
| | | " Receiving ChangeStatusMsg from " + senderHandler.getServerId() + |
| | | " for baseDn " + baseDn + ":\n" + csMsg); |
| | | "In RS " + getReplicationServer().getServerId() + |
| | | " Receiving ChangeStatusMsg from " + senderHandler.getServerId() + |
| | | " for baseDn " + baseDn + ":\n" + csMsg); |
| | | } |
| | | |
| | | try |
| | | { |
| | | // Acquire lock on domain (see more details in comment of start() method |
| | | // of ServerHandler) |
| | | lock(); |
| | | } catch (InterruptedException ex) |
| | | { |
| | | // Try doing job anyway... |
| | | } |
| | | try |
| | | { |
| | | // Acquire lock on domain (see more details in comment of start() method |
| | | // of ServerHandler) |
| | | lock(); |
| | | } catch (InterruptedException ex) |
| | | { |
| | | // Try doing job anyway... |
| | | } |
| | | |
| | | ServerStatus newStatus = senderHandler.processNewStatus(csMsg); |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | ServerStatus newStatus = senderHandler.processNewStatus(csMsg); |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | // Already logged an error in processNewStatus() |
| | | // just return not to forward a bad status to topology |
| | | return; |
| | | } |
| | | |
| | | // Update every peers (RS/DS) with topology changes |
| | | buildAndSendTopoInfoToDSs(senderHandler); |
| | | buildAndSendTopoInfoToRSs(); |
| | | |
| | | Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get( |
| | | Integer.toString(senderHandler.getServerId()), |
| | | baseDn.toString(), |
| | | newStatus.toString()); |
| | | logError(message); |
| | | |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // Already logged an error in processNewStatus() |
| | | // just return not to forward a bad status to topology |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, |
| | | stackTraceToSingleLineString(e))); |
| | | } |
| | | finally |
| | | { |
| | | release(); |
| | | return; |
| | | } |
| | | |
| | | // Update every peers (RS/DS) with topology changes |
| | | buildAndSendTopoInfoToDSs(senderHandler); |
| | | buildAndSendTopoInfoToRSs(); |
| | | |
| | | Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get( |
| | | Integer.toString(senderHandler.getServerId()), |
| | | baseDn.toString(), |
| | | newStatus.toString()); |
| | | logError(message); |
| | | |
| | | release(); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | try |
| | | { |
| | | try |
| | | { |
| | | // Acquire lock on domain (see more details in comment of start() method |
| | | // of ServerHandler) |
| | | lock(); |
| | |
| | | (newStatus == oldStatus) ) |
| | | { |
| | | // Change was impossible or already occurred (see StatusAnalyzer comments) |
| | | release(); |
| | | return false; |
| | | } |
| | | |
| | |
| | | buildAndSendTopoInfoToDSs(serverHandler); |
| | | buildAndSendTopoInfoToRSs(); |
| | | |
| | | release(); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, |
| | | stackTraceToSingleLineString(e))); |
| | | } |
| | | finally |
| | | { |
| | | release(); |
| | | } |
| | | return false; |
| | | } |
| | | |
| | |
| | | // Try doing job anyway... |
| | | } |
| | | |
| | | /* |
| | | * Store DS connected to remote RS and update information about the peer RS |
| | | */ |
| | | handler.processTopoInfoFromRS(topoMsg); |
| | | |
| | | /* |
| | | * Handle generation id |
| | | */ |
| | | if (allowResetGenId) |
| | | try |
| | | { |
| | | // Check if generation id has to be resetted |
| | | mayResetGenerationId(); |
| | | if (generationId < 0) |
| | | generationId = handler.getGenerationId(); |
| | | } |
| | | /* |
| | | * Store DS connected to remote RS & update information about the peer RS |
| | | */ |
| | | handler.processTopoInfoFromRS(topoMsg); |
| | | |
| | | if (generationId > 0 && (generationId != handler.getGenerationId())) |
| | | /* |
| | | * Handle generation id |
| | | */ |
| | | if (allowResetGenId) |
| | | { |
| | | // Check if generation id has to be resetted |
| | | mayResetGenerationId(); |
| | | if (generationId < 0) |
| | | generationId = handler.getGenerationId(); |
| | | } |
| | | |
| | | if (generationId > 0 && (generationId != handler.getGenerationId())) |
| | | { |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get( |
| | | baseDn, |
| | | Integer.toString(handler.getServerId()), |
| | | Long.toString(handler.getGenerationId()), |
| | | Long.toString(generationId)); |
| | | logError(message); |
| | | |
| | | ErrorMsg errorMsg = new ErrorMsg( |
| | | getReplicationServer().getServerId(), |
| | | handler.getServerId(), |
| | | message); |
| | | handler.sendError(errorMsg); |
| | | } |
| | | |
| | | /* |
| | | * Sends the currently known topology information to every connected |
| | | * DS we have. |
| | | */ |
| | | buildAndSendTopoInfoToDSs(null); |
| | | |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get( |
| | | baseDn, |
| | | Integer.toString(handler.getServerId()), |
| | | Long.toString(handler.getGenerationId()), |
| | | Long.toString(generationId)); |
| | | logError(message); |
| | | |
| | | ErrorMsg errorMsg = new ErrorMsg( |
| | | getReplicationServer().getServerId(), |
| | | handler.getServerId(), |
| | | message); |
| | | handler.sendError(errorMsg); |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, |
| | | stackTraceToSingleLineString(e))); |
| | | } |
| | | |
| | | /* |
| | | * Sends the currently known topology information to every connected |
| | | * DS we have. |
| | | */ |
| | | buildAndSendTopoInfoToDSs(null); |
| | | |
| | | release(); |
| | | finally |
| | | { |
| | | release(); |
| | | } |
| | | } |
| | | |
| | | /* ======================= |
| | |
| | | // Consider this producer (DS/db). |
| | | int sid = db.getServerId(); |
| | | |
| | | // Should it be considered for eligibility ? |
| | | ChangeNumber heartbeatLastDN = |
| | | getChangeTimeHeartbeatState().getMaxChangeNumber(sid); |
| | | |
| | | // If the most recent UpdateMsg or CLHeartbeatMsg received is very old |
| | | // then the domain is considered down and not considered for eligibility |
| | | /* |
| | | if ((heartbeatLastDN != null) && |
| | | (TimeThread.getTime()- heartbeatLastDN.getTime() > 5000)) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + this.getName() + |
| | | " Server " + sid |
| | | + " is not considered for eligibility ... potentially down"); |
| | | continue; |
| | | } |
| | | */ |
| | | |
| | | boolean sidConnected = false; |
| | | if (directoryServers.containsKey(sid)) |
| | | { |
| | | sidConnected = true; |
| | | } |
| | | else |
| | | { |
| | | // not directly connected |
| | | for (ReplicationServerHandler rsh : replicationServers.values()) |
| | | { |
| | | if (rsh.isRemoteLDAPServer(sid)) |
| | | { |
| | | sidConnected = true; |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | if (!sidConnected) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + this.getName() + |
| | | " Server " + sid |
| | | + " is not considered for eligibility ... potentially down"); |
| | | continue; |
| | | } |
| | | |
| | | ChangeNumber changelogLastCN = db.getLastChange(); |
| | | if (changelogLastCN != null) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | ChangeNumber heartbeatLastDN = |
| | | getChangeTimeHeartbeatState().getMaxChangeNumber(sid); |
| | | |
| | | if ((heartbeatLastDN != null) && |
| | | ((eligibleCN == null) || (heartbeatLastDN.newer(eligibleCN)))) |
| | | { |