| | |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.messages.*; |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | |
| | | private Environment dbEnvironment = null; |
| | | private Database stateDb = null; |
| | | private ReplicationServer replicationServer = null; |
| | | private static final String GENERATION_ID_TAG = "GENID"; |
| | | private static final String FIELD_SEPARATOR = " "; |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | | * Initialize this class. |
| | |
| | | Cursor cursor = stateDb.openCursor(null, null); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | |
| | | try |
| | | { |
| | | /* |
| | | * Get the domain base DN/ generationIDs records |
| | | */ |
| | | OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | try |
| | | { |
| | | String stringData = new String(data.getData(), "UTF-8"); |
| | | String[] str = stringData.split(" ", 2); |
| | | short serverId = new Short(str[0]); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " Read tag baseDn generationId=" + stringData); |
| | | |
| | | String[] str = stringData.split(FIELD_SEPARATOR, 3); |
| | | if (str[0].equals(GENERATION_ID_TAG)) |
| | | { |
| | | long generationId=-1; |
| | | |
| | | DN baseDn; |
| | | |
| | | try |
| | | { |
| | | // <generationId> |
| | | generationId = new Long(str[1]); |
| | | } |
| | | catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message.raw( |
| | | "replicationServer state database has a wrong format: " + |
| | | e.getLocalizedMessage() |
| | | + "<" + str[1] + ">")); |
| | | } |
| | | |
| | | // <baseDn> |
| | | baseDn = null; |
| | | try |
| | | { |
| | | baseDn = DN.decode(str[2]); |
| | | } catch (DirectoryException e) |
| | | { |
| | | Message message = |
| | | ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]); |
| | | logError(message); |
| | | |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " Has read baseDn=" + baseDn |
| | | + " generationId=" + generationId); |
| | | |
| | | replicationServer.getReplicationCache(baseDn, true). |
| | | setGenerationId(generationId, true); |
| | | } |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // should never happens |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message.raw("need UTF-8 support")); |
| | | } |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | |
| | | /* |
| | | * Get the server Id / domain base DN records |
| | | */ |
| | | status = cursor.getFirst(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | String stringData = null; |
| | | try |
| | | { |
| | | stringData = new String(data.getData(), "UTF-8"); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // should never happens |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message.raw( |
| | | "need UTF-8 support")); |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " Read serverId BaseDN=" + stringData); |
| | | |
| | | String[] str = stringData.split(FIELD_SEPARATOR, 2); |
| | | if (!str[0].equals(GENERATION_ID_TAG)) |
| | | { |
| | | short serverId = -1; |
| | | try |
| | | { |
| | | // <serverId> |
| | | serverId = new Short(str[0]); |
| | | } catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message.raw( |
| | | "replicationServer state database has a wrong format: " + |
| | | e.getLocalizedMessage() |
| | | + "<" + str[0] + ">")); |
| | | } |
| | | // <baseDn> |
| | | DN baseDn = null; |
| | | try |
| | | { |
| | |
| | | } catch (DirectoryException e) |
| | | { |
| | | Message message = |
| | | ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]); |
| | | ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]); |
| | | logError(message); |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " Has read: baseDn=" + baseDn |
| | | + " serverId=" + serverId); |
| | | |
| | | DbHandler dbHandler = |
| | | new DbHandler(serverId, baseDn, replicationServer, this); |
| | | replicationServer.getReplicationCache(baseDn).newDb(serverId, |
| | | dbHandler); |
| | | } catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message.raw( |
| | | "replicationServer state database has a wrong format")); |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // should never happens |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message.raw( |
| | | "need UTF-8 support")); |
| | | new DbHandler(serverId, baseDn, replicationServer, this, 1); |
| | | |
| | | replicationServer.getReplicationCache(baseDn, true). |
| | | setDbHandler(serverId, dbHandler); |
| | | } |
| | | |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | cursor.close(); |
| | | |
| | | } catch (DatabaseException dbe) { |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | cursor.close(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Find or create the database used to store changes from the server |
| | | * with the given serverId and the given baseDn. |
| | | * @param serverId The server id that identifies the server. |
| | | * @param baseDn The baseDn that identifies the server. |
| | | * @return the Database. |
| | | * @throws DatabaseException in case of underlying Exception. |
| | | */ |
| | | public Database getOrAddDb(Short serverId, DN baseDn) |
| | | throws DatabaseException |
| | | { |
| | | try |
| | | /** |
| | | * Finds or creates the database used to store changes from the server |
| | | * with the given serverId and the given baseDn. |
| | | * |
| | | * @param serverId The server id that identifies the server. |
| | | * @param baseDn The baseDn that identifies the domain. |
| | | * @param generationId The generationId associated to this domain. |
| | | * @return the Database. |
| | | * @throws DatabaseException in case of underlying Exception. |
| | | */ |
| | | public Database getOrAddDb(Short serverId, DN baseDn, Long generationId) |
| | | throws DatabaseException |
| | | { |
| | | String stringId = serverId.toString() + " " + baseDn.toNormalizedString(); |
| | | byte[] byteId; |
| | | |
| | | byteId = stringId.getBytes("UTF-8"); |
| | | |
| | | // Open the database. Create it if it does not already exist. |
| | | DatabaseConfig dbConfig = new DatabaseConfig(); |
| | | dbConfig.setAllowCreate(true); |
| | | dbConfig.setTransactional(true); |
| | | Database db = dbEnvironment.openDatabase(null, stringId, dbConfig); |
| | | |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("ReplicationDbEnv.getOrAddDb() " + |
| | | serverId + " " + baseDn + " " + generationId); |
| | | try |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try { |
| | | data.setData(byteId); |
| | | stateDb.put(txn, key, data); |
| | | txn.commitWriteNoSync(); |
| | | } catch (DatabaseException dbe) |
| | | String stringId = serverId.toString() + FIELD_SEPARATOR |
| | | + baseDn.toNormalizedString(); |
| | | |
| | | // Opens the database for the changes received from this server |
| | | // on this domain. Create it if it does not already exist. |
| | | DatabaseConfig dbConfig = new DatabaseConfig(); |
| | | dbConfig.setAllowCreate(true); |
| | | dbConfig.setTransactional(true); |
| | | Database db = dbEnvironment.openDatabase(null, stringId, dbConfig); |
| | | |
| | | // Creates the record serverId/domain base Dn in the stateDb |
| | | // if it does not already exist. |
| | | byte[] byteId; |
| | | byteId = stringId.getBytes("UTF-8"); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try { |
| | | data.setData(byteId); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getOrAddDb() Created in the state Db record " + |
| | | " serverId/Domain=<"+stringId+">"); |
| | | stateDb.put(txn, key, data); |
| | | txn.commitWriteNoSync(); |
| | | } catch (DatabaseException dbe) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | |
| | | // Creates the record domain base Dn/ generationId in the stateDb |
| | | // if it does not already exist. |
| | | stringId = GENERATION_ID_TAG + FIELD_SEPARATOR + |
| | | baseDn.toNormalizedString(); |
| | | String dataStringId = GENERATION_ID_TAG + FIELD_SEPARATOR + |
| | | generationId.toString() + FIELD_SEPARATOR + |
| | | baseDn.toNormalizedString(); |
| | | byteId = stringId.getBytes("UTF-8"); |
| | | byte[] dataByteId; |
| | | dataByteId = dataStringId.getBytes("UTF-8"); |
| | | key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | data = new DatabaseEntry(); |
| | | status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try { |
| | | data.setData(dataByteId); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "Created in the state Db record Tag/Domain/GenId key=" + |
| | | stringId + " value=" + dataStringId); |
| | | stateDb.put(txn, key, data); |
| | | txn.commitWriteNoSync(); |
| | | } catch (DatabaseException dbe) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | return db; |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // can't happen |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new transaction. |
| | | * |
| | | * @return the transaction. |
| | | * @throws DatabaseException in case of underlying database Exception. |
| | | */ |
| | | public Transaction beginTransaction() throws DatabaseException |
| | | { |
| | | return dbEnvironment.beginTransaction(null, null); |
| | | } |
| | | |
| | | /** |
| | | * Shutdown the Db environment. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | try |
| | | { |
| | | stateDb.close(); |
| | | dbEnvironment.close(); |
| | | } catch (DatabaseException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Clears the provided generationId associated to the provided baseDn |
| | | * from the state Db. |
| | | * |
| | | * @param baseDn The baseDn for which the generationID must be cleared. |
| | | * |
| | | */ |
| | | public void clearGenerationId(DN baseDn) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " clearGenerationId " + baseDn); |
| | | try |
| | | { |
| | | // Deletes the record domain base Dn/ generationId in the stateDb |
| | | String stringId = GENERATION_ID_TAG + FIELD_SEPARATOR + |
| | | baseDn.toNormalizedString(); |
| | | byte[] byteId = stringId.getBytes("UTF-8"); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if ((status == OperationStatus.SUCCESS) || |
| | | (status == OperationStatus.KEYEXIST)) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try |
| | | { |
| | | stateDb.delete(txn, key); |
| | | txn.commitWriteNoSync(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " clearGenerationId (" + |
| | | baseDn +") succeeded."); |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // TODO : should have a better error logging |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " clearGenerationId ("+ baseDn + " failed" + status.toString()); |
| | | } |
| | | } |
| | | return db; |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // can't happen |
| | | return null; |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // can't happen |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | // can't happen |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new transaction. |
| | | * |
| | | * @return the transaction. |
| | | * @throws DatabaseException in case of underlying database Exception. |
| | | */ |
| | | public Transaction beginTransaction() throws DatabaseException |
| | | { |
| | | return dbEnvironment.beginTransaction(null, null); |
| | | } |
| | | /** |
| | | * Clears the provided serverId associated to the provided baseDn |
| | | * from the state Db. |
| | | * |
| | | * @param baseDn The baseDn for which the generationID must be cleared. |
| | | * @param serverId The serverId to remove from the Db. |
| | | * |
| | | */ |
| | | public void clearServerId(DN baseDn, Short serverId) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | "clearServerId(baseDN=" + baseDn + ", serverId=" + serverId); |
| | | try |
| | | { |
| | | String stringId = serverId.toString() + FIELD_SEPARATOR |
| | | + baseDn.toNormalizedString(); |
| | | |
| | | /** |
| | | * Shutdown the Db environment. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | try |
| | | { |
| | | stateDb.close(); |
| | | dbEnvironment.close(); |
| | | } catch (DatabaseException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | // Deletes the record serverId/domain base Dn in the stateDb |
| | | byte[] byteId; |
| | | byteId = stringId.getBytes("UTF-8"); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.NOTFOUND) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try { |
| | | data.setData(byteId); |
| | | stateDb.delete(txn, key); |
| | | txn.commitWriteNoSync(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | " In " + this.replicationServer.getMonitorInstanceName() + |
| | | " clearServerId() succeeded " + baseDn + " " + |
| | | serverId); |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // can't happen |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | // can't happen |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Clears the database. |
| | | * |
| | | * @param databaseName The name of the database to clear. |
| | | */ |
| | | public final void clearDb(String databaseName) |
| | | { |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | "clearDb" + databaseName); |
| | | |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | dbEnvironment.truncateDatabase(txn, databaseName, false); |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_ERROR_CLEARING_DB.get(databaseName, |
| | | dbe.getLocalizedMessage())); |
| | | logError(mb.toMessage()); |
| | | } |
| | | } |
| | | } |