| | |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | import org.opends.messages.*; |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.io.File; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | |
| | | import com.sleepycat.je.*; |
| | | |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | /** |
| | | * This class is used to represent a Db environment that can be used |
| | | * to create ReplicationDB. |
| | |
| | | |
| | | try |
| | | { |
| | | /* |
| | | * Get the domain base DN/ generationIDs records |
| | | */ |
| | | OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | try |
| | | { |
| | | String stringData = new String(data.getData(), "UTF-8"); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " Read tag baseDn generationId=" + stringData); |
| | | |
| | | String[] str = stringData.split(FIELD_SEPARATOR, 3); |
| | | if (str[0].equals(GENERATION_ID_TAG)) |
| | | { |
| | | long generationId; |
| | | try |
| | | { |
| | | // <generationId> |
| | | generationId = new Long(str[1]); |
| | | } |
| | | catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message.raw( |
| | | "replicationServer state database has a wrong format: " + |
| | | e.getLocalizedMessage() |
| | | + "<" + str[1] + ">")); |
| | | } |
| | | |
| | | String baseDn = str[2]; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " Has read baseDn=" + baseDn |
| | | + " generationId=" + generationId); |
| | | |
| | | replicationServer.getReplicationServerDomain(baseDn, true). |
| | | initGenerationID(generationId); |
| | | } |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // should never happens |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message.raw("need UTF-8 support")); |
| | | } |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | |
| | | /* |
| | | * Get the server Id / domain base DN records |
| | | */ |
| | | status = cursor.getFirst(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | String stringData; |
| | | try |
| | | { |
| | | stringData = new String(data.getData(), "UTF-8"); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // should never happens |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message.raw( |
| | | "need UTF-8 support")); |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " Read serverId BaseDN=" + stringData); |
| | | |
| | | String[] str = stringData.split(FIELD_SEPARATOR, 2); |
| | | if (!str[0].equals(GENERATION_ID_TAG)) |
| | | { |
| | | int serverId; |
| | | try |
| | | { |
| | | // <serverId> |
| | | serverId = new Integer(str[0]); |
| | | } catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message.raw( |
| | | "replicationServer state database has a wrong format: " + |
| | | e.getLocalizedMessage() |
| | | + "<" + str[0] + ">")); |
| | | } |
| | | // <baseDn> |
| | | String baseDn = str[1]; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " Has read: baseDn=" + baseDn |
| | | + " serverId=" + serverId); |
| | | |
| | | DbHandler dbHandler = |
| | | new DbHandler( |
| | | serverId, baseDn, replicationServer, this, |
| | | replicationServer.getQueueSize()); |
| | | |
| | | replicationServer.getReplicationServerDomain(baseDn, true). |
| | | setDbHandler(serverId, dbHandler); |
| | | } |
| | | |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | readDomainBaseDNGenerationIDRecords(key, data, cursor); |
| | | readServerIdDomainBaseDNRecords(key, data, cursor); |
| | | } |
| | | finally |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private void readDomainBaseDNGenerationIDRecords(DatabaseEntry key, |
| | | DatabaseEntry data, Cursor cursor) throws ReplicationDBException |
| | | { |
| | | /* |
| | | * Get the domain base DN/ generationIDs records |
| | | */ |
| | | OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | String stringData = toString(data.getData()); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " |
| | | + this.replicationServer.getMonitorInstanceName() |
| | | + " Read tag baseDn generationId=" + stringData); |
| | | |
| | | String[] str = stringData.split(FIELD_SEPARATOR, 3); |
| | | if (str[0].equals(GENERATION_ID_TAG)) |
| | | { |
| | | long generationId = toLong(str[1]); |
| | | String baseDn = str[2]; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " |
| | | + this.replicationServer.getMonitorInstanceName() |
| | | + " Has read baseDn=" + baseDn + " generationId=" + generationId); |
| | | |
| | | replicationServer.getReplicationServerDomain(baseDn, true) |
| | | .initGenerationID(generationId); |
| | | } |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | } |
| | | |
| | | private void readServerIdDomainBaseDNRecords(DatabaseEntry key, |
| | | DatabaseEntry data, Cursor cursor) throws ReplicationDBException |
| | | { |
| | | /* |
| | | * Get the server Id / domain base DN records |
| | | */ |
| | | OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | String stringData = toString(data.getData()); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " |
| | | + this.replicationServer.getMonitorInstanceName() |
| | | + " Read serverId BaseDN=" + stringData); |
| | | |
| | | String[] str = stringData.split(FIELD_SEPARATOR, 2); |
| | | if (!str[0].equals(GENERATION_ID_TAG)) |
| | | { |
| | | int serverId = toInt(str[0]); |
| | | String baseDn = str[1]; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " |
| | | + this.replicationServer.getMonitorInstanceName() |
| | | + " Has read: baseDn=" + baseDn + " serverId=" + serverId); |
| | | |
| | | DbHandler dbHandler = |
| | | new DbHandler(serverId, baseDn, replicationServer, this, |
| | | replicationServer.getQueueSize()); |
| | | |
| | | replicationServer.getReplicationServerDomain(baseDn, true) |
| | | .setDbHandler(serverId, dbHandler); |
| | | } |
| | | |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | } |
| | | |
| | | private int toInt(String data) throws ReplicationDBException |
| | | { |
| | | try |
| | | { |
| | | return Integer.parseInt(data); |
| | | } catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message |
| | | .raw("replicationServer state database has a wrong format: " |
| | | + e.getLocalizedMessage() + "<" + data + ">")); |
| | | } |
| | | } |
| | | |
| | | private long toLong(String data) throws ReplicationDBException |
| | | { |
| | | try |
| | | { |
| | | return Long.parseLong(data); |
| | | } |
| | | catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message |
| | | .raw("replicationServer state database has a wrong format: " |
| | | + e.getLocalizedMessage() + "<" + data + ">")); |
| | | } |
| | | } |
| | | |
| | | private String toString(byte[] data) throws ReplicationDBException |
| | | { |
| | | try |
| | | { |
| | | return new String(data, "UTF-8"); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // should never happens |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message.raw("need UTF-8 support")); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Finds or creates the database used to store changes from the server |
| | | * with the given serverId and the given baseDn. |
| | |
| | | * @return the Database. |
| | | * @throws DatabaseException in case of underlying Exception. |
| | | */ |
| | | public Database getOrAddDb(int serverId, String baseDn, Long generationId) |
| | | public Database getOrAddDb(int serverId, String baseDn, long generationId) |
| | | throws DatabaseException |
| | | { |
| | | if (debugEnabled()) |
| | |
| | | serverId + " " + baseDn + " " + generationId); |
| | | try |
| | | { |
| | | String stringId = serverId + FIELD_SEPARATOR + baseDn; |
| | | String key = serverId + FIELD_SEPARATOR + baseDn; |
| | | |
| | | // Opens the database for the changes received from this server |
| | | // on this domain. Create it if it does not already exist. |
| | | DatabaseConfig dbConfig = new DatabaseConfig(); |
| | | dbConfig.setAllowCreate(true); |
| | | dbConfig.setTransactional(true); |
| | | Database db = dbEnvironment.openDatabase(null, stringId, dbConfig); |
| | | Database db = dbEnvironment.openDatabase(null, key, dbConfig); |
| | | |
| | | // Creates the record serverId/domain base Dn in the stateDb |
| | | // if it does not already exist. |
| | | byte[] byteId; |
| | | byteId = stringId.getBytes("UTF-8"); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try { |
| | | data.setData(byteId); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getOrAddDb() Created in the state Db record " + |
| | | " serverId/Domain=<"+stringId+">"); |
| | | stateDb.put(txn, key, data); |
| | | txn.commit(Durability.COMMIT_WRITE_NO_SYNC); |
| | | } catch (DatabaseException dbe) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | putInStateDBIfNotExist(key, key); |
| | | |
| | | // Creates the record domain base Dn/ generationId in the stateDb |
| | | // if it does not already exist. |
| | | stringId = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn; |
| | | String dataStringId = GENERATION_ID_TAG + FIELD_SEPARATOR + |
| | | generationId.toString() + FIELD_SEPARATOR + baseDn; |
| | | byteId = stringId.getBytes("UTF-8"); |
| | | byte[] dataByteId; |
| | | dataByteId = dataStringId.getBytes("UTF-8"); |
| | | key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | data = new DatabaseEntry(); |
| | | status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try { |
| | | data.setData(dataByteId); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "Created in the state Db record Tag/Domain/GenId key=" + |
| | | stringId + " value=" + dataStringId); |
| | | stateDb.put(txn, key, data); |
| | | txn.commit(Durability.COMMIT_WRITE_NO_SYNC); |
| | | } catch (DatabaseException dbe) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | key = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn; |
| | | String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId |
| | | + FIELD_SEPARATOR + baseDn; |
| | | putInStateDBIfNotExist(key, data); |
| | | return db; |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | |
| | | } |
| | | } |
| | | |
| | | private void putInStateDBIfNotExist(String keyString, String dataString) |
| | | throws UnsupportedEncodingException |
| | | { |
| | | byte[] byteId = keyString.getBytes("UTF-8"); |
| | | byte[] dataByteId = dataString.getBytes("UTF-8"); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try |
| | | { |
| | | data.setData(dataByteId); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Created in the state Db record key=[" + keyString |
| | | + "] value=[" + dataString + "]"); |
| | | stateDb.put(txn, key, data); |
| | | txn.commit(Durability.COMMIT_WRITE_NO_SYNC); |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new transaction. |
| | | * |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Clears the provided generationId associated to the provided baseDn |
| | | * from the state Db. |
| | | * |
| | | * @param baseDn The baseDn for which the generationID must be cleared. |
| | | * |
| | | */ |
| | | public void clearGenerationId(String baseDn) |
| | | /** |
| | | * Clears the provided generationId associated to the provided baseDn from the |
| | | * state Db. |
| | | * |
| | | * @param baseDn |
| | | * The baseDn for which the generationID must be cleared. |
| | | */ |
| | | public void clearGenerationId(String baseDn) |
| | | { |
| | | String methodInvocation = "clearGenerationId(baseDN=" + baseDn + ")"; |
| | | |
| | | String key = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn; |
| | | OperationStatus status = deleteFromStateDB(key, methodInvocation); |
| | | if (status == OperationStatus.SUCCESS || status == OperationStatus.KEYEXIST) |
| | | { |
| | | // TODO : should have a better error logging |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " clearGenerationId " + baseDn); |
| | | try |
| | | TRACER.debugInfo("In " |
| | | + this.replicationServer.getMonitorInstanceName() |
| | | + methodInvocation + " failed " + status); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Clears the provided serverId associated to the provided baseDn from the |
| | | * state Db. |
| | | * |
| | | * @param baseDn |
| | | * The baseDn for which the generationID must be cleared. |
| | | * @param serverId |
| | | * The serverId to remove from the Db. |
| | | */ |
| | | public void clearServerId(String baseDn, int serverId) |
| | | { |
| | | String key = serverId + FIELD_SEPARATOR + baseDn; |
| | | deleteFromStateDB(key, "clearServerId(baseDN=" + baseDn + " , serverId=" |
| | | + serverId + ")"); |
| | | } |
| | | |
| | | private OperationStatus deleteFromStateDB(String keyString, |
| | | String methodInvocation) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | + " " + methodInvocation); |
| | | |
| | | try |
| | | { |
| | | final byte[] byteId = keyString.getBytes("UTF-8"); |
| | | final DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | final DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.NOTFOUND) |
| | | { |
| | | // Deletes the record domain base Dn/ generationId in the stateDb |
| | | String stringId = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn; |
| | | byte[] byteId = stringId.getBytes("UTF-8"); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if ((status == OperationStatus.SUCCESS) || |
| | | (status == OperationStatus.KEYEXIST)) |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try |
| | | { |
| | | stateDb.delete(txn, key); |
| | | txn.commit(Durability.COMMIT_WRITE_NO_SYNC); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " clearGenerationId (" + baseDn +") succeeded."); |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // TODO : should have a better error logging |
| | | stateDb.delete(txn, key); |
| | | txn.commit(Durability.COMMIT_WRITE_NO_SYNC); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " clearGenerationId ("+ baseDn + " failed" + status.toString()); |
| | | TRACER.debugInfo(" In " |
| | | + this.replicationServer.getMonitorInstanceName() + " " |
| | | + methodInvocation + " succeeded " + status); |
| | | } |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // can't happen |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | // can't happen |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Clears the provided serverId associated to the provided baseDn |
| | | * from the state Db. |
| | | * |
| | | * @param baseDn The baseDn for which the generationID must be cleared. |
| | | * @param serverId The serverId to remove from the Db. |
| | | * |
| | | */ |
| | | public void clearServerId(String baseDn, int serverId) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | "clearServerId(baseDN=" + baseDn + ", serverId=" + serverId); |
| | | try |
| | | { |
| | | String stringId = serverId + FIELD_SEPARATOR + baseDn; |
| | | |
| | | // Deletes the record serverId/domain base Dn in the stateDb |
| | | byte[] byteId; |
| | | byteId = stringId.getBytes("UTF-8"); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.NOTFOUND) |
| | | catch (DatabaseException dbe) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try { |
| | | data.setData(byteId); |
| | | stateDb.delete(txn, key); |
| | | txn.commit(Durability.COMMIT_WRITE_NO_SYNC); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | " In " + this.replicationServer.getMonitorInstanceName() + |
| | | " clearServerId() succeeded " + baseDn + " " + |
| | | serverId); |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // can't happen |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | // can't happen |
| | | } |
| | | return status; |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // can't happen |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | // FIXME can actually happen (see catch above) |
| | | // what should we do about it? |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * Clears the database. |