From 1284f3f2c26a3308d2fe84740d7ab0a9c20cd71a Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Mon, 26 Oct 2009 08:49:05 +0000
Subject: [PATCH] Garanty that release will always be called on domain after lock
---
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 387 ++++++++++++++++++++++++++++++++++---------------------
1 files changed, 240 insertions(+), 147 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index b60fee8..cabec3a 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1142,20 +1142,31 @@
{
try
{
- // Acquire lock on domain (see more details in comment of start()
- // method of ServerHandler)
- lock();
- } catch (InterruptedException ex)
- {
- // Try doing job anyway...
+ try
+ {
+ // Acquire lock on domain (see more details in comment of start()
+ // method of ServerHandler)
+ lock();
+ } catch (InterruptedException ex)
+ {
+ // Try doing job anyway...
+ }
+ if (otherHandlers.contains(handler))
+ {
+ unRegisterHandler(handler);
+ handler.shutdown();
+ }
}
- if (otherHandlers.contains(handler))
+ catch(Exception e)
{
- unRegisterHandler(handler);
- handler.shutdown();
+ logError(Message.raw(Category.SYNC, Severity.NOTICE,
+ stackTraceToSingleLineString(e)));
+ }
+ finally
+ {
+ release();
}
}
- release();
}
/**
@@ -2025,86 +2036,96 @@
{
if (debugEnabled())
TRACER.debugInfo(
- "In " + this +
- " Receiving ResetGenerationIdMsg from " + senderHandler.getServerId() +
- " for baseDn " + baseDn + ":\n" + genIdMsg);
-
+ "In " + this +
+ " Receiving ResetGenerationIdMsg from " + senderHandler.getServerId()+
+ " for baseDn " + baseDn + ":\n" + genIdMsg);
try
{
- // Acquire lock on domain (see more details in comment of start() method
- // of ServerHandler)
- lock();
- } catch (InterruptedException ex)
- {
- // Try doing job anyway...
- }
-
- long newGenId = genIdMsg.getGenerationId();
-
- if (newGenId != this.generationId)
- {
- changeGenerationId(newGenId, false);
- }
- else
- {
- // Order to take a gen id we already have, just ignore
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this
- + " Reset generation id requested for baseDn " + baseDn
- + " but generation id was already " + this.generationId
- + ":\n" + genIdMsg);
- }
-
- // If we are the first replication server warned,
- // then forwards the reset message to the remote replication servers
- for (ServerHandler rsHandler : replicationServers.values())
- {
try
{
- // After we'll have sent the message , the remote RS will adopt
- // the new genId
- rsHandler.setGenerationId(newGenId);
- if (senderHandler.isDataServer())
+ // Acquire lock on domain (see more details in comment of start() method
+ // of ServerHandler)
+ lock();
+ } catch (InterruptedException ex)
+ {
+ // Try doing job anyway...
+ }
+
+ long newGenId = genIdMsg.getGenerationId();
+
+ if (newGenId != this.generationId)
+ {
+ changeGenerationId(newGenId, false);
+ }
+ else
+ {
+ // Order to take a gen id we already have, just ignore
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this
+ + " Reset generation id requested for baseDn " + baseDn
+ + " but generation id was already " + this.generationId
+ + ":\n" + genIdMsg);
+ }
+
+ // If we are the first replication server warned,
+ // then forwards the reset message to the remote replication servers
+ for (ServerHandler rsHandler : replicationServers.values())
+ {
+ try
{
- rsHandler.send(genIdMsg);
+ // After we'll have sent the message , the remote RS will adopt
+ // the new genId
+ rsHandler.setGenerationId(newGenId);
+ if (senderHandler.isDataServer())
+ {
+ rsHandler.send(genIdMsg);
+ }
+ } catch (IOException e)
+ {
+ logError(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID.get(baseDn.toString(),
+ e.getMessage()));
}
- } catch (IOException e)
- {
- logError(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID.get(baseDn.toString(),
- e.getMessage()));
}
- }
- // Change status of the connected DSs according to the requested new
- // reference generation id
- for (DataServerHandler dsHandler : directoryServers.values())
+ // Change status of the connected DSs according to the requested new
+ // reference generation id
+ for (DataServerHandler dsHandler : directoryServers.values())
+ {
+ try
+ {
+ dsHandler.changeStatusForResetGenId(newGenId);
+ } catch (IOException e)
+ {
+ logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get(baseDn.
+ toString(),
+ Integer.toString(dsHandler.getServerId()),
+ e.getMessage()));
+ }
+ }
+
+ // Update every peers (RS/DS) with potential topology changes (status
+ // change). Rather than doing that each time a DS has a status change
+ // (consecutive to reset gen id message), we prefer advertising once for
+ // all after changes (less packet sent), here at the end of the reset msg
+ // treatment.
+ buildAndSendTopoInfoToDSs(null);
+ buildAndSendTopoInfoToRSs();
+
+ Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString(),
+ Long.toString(newGenId));
+ logError(message);
+
+ }
+ catch(Exception e)
{
- try
- {
- dsHandler.changeStatusForResetGenId(newGenId);
- } catch (IOException e)
- {
- logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get(baseDn.
- toString(),
- Integer.toString(dsHandler.getServerId()),
- e.getMessage()));
- }
+ logError(Message.raw(Category.SYNC, Severity.NOTICE,
+ stackTraceToSingleLineString(e)));
}
-
- // Update every peers (RS/DS) with potential topology changes (status
- // change). Rather than doing that each time a DS has a status change
- // (consecutive to reset gen id message), we prefer advertising once for
- // all after changes (less packet sent), here at the end of the reset msg
- // treatment.
- buildAndSendTopoInfoToDSs(null);
- buildAndSendTopoInfoToRSs();
-
- Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString(),
- Long.toString(newGenId));
- logError(message);
-
- release();
+ finally
+ {
+ release();
+ }
}
/**
@@ -2119,41 +2140,51 @@
if (debugEnabled())
{
TRACER.debugInfo(
- "In RS " + getReplicationServer().getServerId() +
- " Receiving ChangeStatusMsg from " + senderHandler.getServerId() +
- " for baseDn " + baseDn + ":\n" + csMsg);
+ "In RS " + getReplicationServer().getServerId() +
+ " Receiving ChangeStatusMsg from " + senderHandler.getServerId() +
+ " for baseDn " + baseDn + ":\n" + csMsg);
}
try
{
- // Acquire lock on domain (see more details in comment of start() method
- // of ServerHandler)
- lock();
- } catch (InterruptedException ex)
- {
- // Try doing job anyway...
- }
+ try
+ {
+ // Acquire lock on domain (see more details in comment of start() method
+ // of ServerHandler)
+ lock();
+ } catch (InterruptedException ex)
+ {
+ // Try doing job anyway...
+ }
- ServerStatus newStatus = senderHandler.processNewStatus(csMsg);
- if (newStatus == ServerStatus.INVALID_STATUS)
+ ServerStatus newStatus = senderHandler.processNewStatus(csMsg);
+ if (newStatus == ServerStatus.INVALID_STATUS)
+ {
+ // Already logged an error in processNewStatus()
+ // just return not to forward a bad status to topology
+ return;
+ }
+
+ // Update every peers (RS/DS) with topology changes
+ buildAndSendTopoInfoToDSs(senderHandler);
+ buildAndSendTopoInfoToRSs();
+
+ Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
+ Integer.toString(senderHandler.getServerId()),
+ baseDn.toString(),
+ newStatus.toString());
+ logError(message);
+
+ }
+ catch(Exception e)
{
- // Already logged an error in processNewStatus()
- // just return not to forward a bad status to topology
+ logError(Message.raw(Category.SYNC, Severity.NOTICE,
+ stackTraceToSingleLineString(e)));
+ }
+ finally
+ {
release();
- return;
}
-
- // Update every peers (RS/DS) with topology changes
- buildAndSendTopoInfoToDSs(senderHandler);
- buildAndSendTopoInfoToRSs();
-
- Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
- Integer.toString(senderHandler.getServerId()),
- baseDn.toString(),
- newStatus.toString());
- logError(message);
-
- release();
}
/**
@@ -2168,6 +2199,8 @@
{
try
{
+ try
+ {
// Acquire lock on domain (see more details in comment of start() method
// of ServerHandler)
lock();
@@ -2210,7 +2243,6 @@
(newStatus == oldStatus) )
{
// Change was impossible or already occurred (see StatusAnalyzer comments)
- release();
return false;
}
@@ -2218,7 +2250,16 @@
buildAndSendTopoInfoToDSs(serverHandler);
buildAndSendTopoInfoToRSs();
- release();
+ }
+ catch(Exception e)
+ {
+ logError(Message.raw(Category.SYNC, Severity.NOTICE,
+ stackTraceToSingleLineString(e)));
+ }
+ finally
+ {
+ release();
+ }
return false;
}
@@ -2338,45 +2379,56 @@
// Try doing job anyway...
}
- /*
- * Store DS connected to remote RS and update information about the peer RS
- */
- handler.processTopoInfoFromRS(topoMsg);
-
- /*
- * Handle generation id
- */
- if (allowResetGenId)
+ try
{
- // Check if generation id has to be resetted
- mayResetGenerationId();
- if (generationId < 0)
- generationId = handler.getGenerationId();
- }
+ /*
+ * Store DS connected to remote RS & update information about the peer RS
+ */
+ handler.processTopoInfoFromRS(topoMsg);
- if (generationId > 0 && (generationId != handler.getGenerationId()))
+ /*
+ * Handle generation id
+ */
+ if (allowResetGenId)
+ {
+ // Check if generation id has to be resetted
+ mayResetGenerationId();
+ if (generationId < 0)
+ generationId = handler.getGenerationId();
+ }
+
+ if (generationId > 0 && (generationId != handler.getGenerationId()))
+ {
+ Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
+ baseDn,
+ Integer.toString(handler.getServerId()),
+ Long.toString(handler.getGenerationId()),
+ Long.toString(generationId));
+ logError(message);
+
+ ErrorMsg errorMsg = new ErrorMsg(
+ getReplicationServer().getServerId(),
+ handler.getServerId(),
+ message);
+ handler.sendError(errorMsg);
+ }
+
+ /*
+ * Sends the currently known topology information to every connected
+ * DS we have.
+ */
+ buildAndSendTopoInfoToDSs(null);
+
+ }
+ catch(Exception e)
{
- Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
- baseDn,
- Integer.toString(handler.getServerId()),
- Long.toString(handler.getGenerationId()),
- Long.toString(generationId));
- logError(message);
-
- ErrorMsg errorMsg = new ErrorMsg(
- getReplicationServer().getServerId(),
- handler.getServerId(),
- message);
- handler.sendError(errorMsg);
+ logError(Message.raw(Category.SYNC, Severity.NOTICE,
+ stackTraceToSingleLineString(e)));
}
-
- /*
- * Sends the currently known topology information to every connected
- * DS we have.
- */
- buildAndSendTopoInfoToDSs(null);
-
- release();
+ finally
+ {
+ release();
+ }
}
/* =======================
@@ -3048,6 +3100,50 @@
// Consider this producer (DS/db).
int sid = db.getServerId();
+ // Should it be considered for eligibility ?
+ ChangeNumber heartbeatLastDN =
+ getChangeTimeHeartbeatState().getMaxChangeNumber(sid);
+
+ // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
+ // then the domain is considered down and not considered for eligibility
+ /*
+ if ((heartbeatLastDN != null) &&
+ (TimeThread.getTime()- heartbeatLastDN.getTime() > 5000))
+ {
+ if (debugEnabled())
+ TRACER.debugInfo("In " + this.getName() +
+ " Server " + sid
+ + " is not considered for eligibility ... potentially down");
+ continue;
+ }
+ */
+
+ boolean sidConnected = false;
+ if (directoryServers.containsKey(sid))
+ {
+ sidConnected = true;
+ }
+ else
+ {
+ // not directly connected
+ for (ReplicationServerHandler rsh : replicationServers.values())
+ {
+ if (rsh.isRemoteLDAPServer(sid))
+ {
+ sidConnected = true;
+ break;
+ }
+ }
+ }
+ if (!sidConnected)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo("In " + this.getName() +
+ " Server " + sid
+ + " is not considered for eligibility ... potentially down");
+ continue;
+ }
+
ChangeNumber changelogLastCN = db.getLastChange();
if (changelogLastCN != null)
{
@@ -3057,9 +3153,6 @@
}
}
- ChangeNumber heartbeatLastDN =
- getChangeTimeHeartbeatState().getMaxChangeNumber(sid);
-
if ((heartbeatLastDN != null) &&
((eligibleCN == null) || (heartbeatLastDN.newer(eligibleCN))))
{
--
Gitblit v1.10.0