| | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | |
| | |
| | | private ReplicationServer replicationServer; |
| | | private final AtomicBoolean isShuttingDown = new AtomicBoolean(false); |
| | | private static final String GENERATION_ID_TAG = "GENID"; |
| | | private static final String OFFLINE_TAG = "OFFLINE"; |
| | | private static final String FIELD_SEPARATOR = " "; |
| | | /** The tracer object for the debug logger. */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | |
| | | } |
| | | result.setDomainGenerationId(baseDN, generationId); |
| | | } |
| | | else if (prefix.equals(OFFLINE_TAG)) |
| | | { |
| | | final String[] str = stringKey.split(FIELD_SEPARATOR, 3); |
| | | final int serverId = toInt(str[1]); |
| | | final DN baseDN = DN.decode(str[2]); |
| | | long timestamp = ByteString.wrap(entry.getValue()).asReader().getLong(); |
| | | if (debugEnabled()) |
| | | { |
| | | debug("has read replica offline: baseDN=" + baseDN + " serverId=" |
| | | + serverId); |
| | | } |
| | | result.addOfflineReplica(baseDN, new CSN(timestamp, 0, serverId)); |
| | | } |
| | | else |
| | | { |
| | | final String[] str = stringData.split(FIELD_SEPARATOR, 2); |
| | |
| | | final DN baseDN = DN.decode(str[1]); |
| | | if (debugEnabled()) |
| | | { |
| | | debug("has read replica: baseDN=" + baseDN + " serverId=" + serverId); |
| | | debug("has read replica: baseDN=" + baseDN + " serverId=" |
| | | + serverId); |
| | | } |
| | | result.addServerIdToDomain(serverId, baseDN); |
| | | } |
| | |
| | | // Opens the DB for the changes received from this server on this domain. |
| | | final Database replicaDB = openDatabase(replicaEntry.getKey()); |
| | | |
| | | putInChangelogStateDBIfNotExist(replicaEntry); |
| | | putInChangelogStateDBIfNotExist(toByteArray(replicaEntry)); |
| | | putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId)); |
| | | return replicaDB; |
| | | } |
| | |
| | | * the replica's serverId |
| | | * @return a database entry for the replica |
| | | */ |
| | | Entry<String, String> toReplicaEntry(DN baseDN, int serverId) |
| | | static Entry<String, String> toReplicaEntry(DN baseDN, int serverId) |
| | | { |
| | | final String key = serverId + FIELD_SEPARATOR + baseDN.toNormalizedString(); |
| | | return new SimpleImmutableEntry<String, String>(key, key); |
| | |
| | | * the domain's generationId |
| | | * @return a database entry for the generationId |
| | | */ |
| | | Entry<String, String> toGenIdEntry(DN baseDN, long generationId) |
| | | static Entry<byte[], byte[]> toGenIdEntry(DN baseDN, long generationId) |
| | | { |
| | | final String normDn = baseDN.toNormalizedString(); |
| | | final String key = GENERATION_ID_TAG + FIELD_SEPARATOR + normDn; |
| | | final String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId |
| | | + FIELD_SEPARATOR + normDn; |
| | | return new SimpleImmutableEntry<String, String>(key, data); |
| | | return new SimpleImmutableEntry<byte[], byte[]>(toBytes(key),toBytes(data)); |
| | | } |
| | | |
| | | private void putInChangelogStateDBIfNotExist(Entry<String, String> entry) |
| | | throws RuntimeException |
| | | /** |
| | | * Converts an Entry<String, String> to an Entry<byte[], byte[]>. |
| | | * |
| | | * @param entry |
| | | * the entry to convert |
| | | * @return the converted entry |
| | | */ |
| | | static Entry<byte[], byte[]> toByteArray(Entry<String, String> entry) |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey())); |
| | | return new SimpleImmutableEntry<byte[], byte[]>( |
| | | toBytes(entry.getKey()), |
| | | toBytes(entry.getValue())); |
| | | } |
| | | |
| | | /** |
| | | * Return an entry to store in the changelog state database representing the |
| | | * time a replica went offline. |
| | | * |
| | | * @param baseDN |
| | | * the replica's baseDN |
| | | * @param offlineCSN |
| | | * the replica's serverId and offline timestamp |
| | | * @return a database entry representing the time a replica went offline |
| | | */ |
| | | static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN) |
| | | { |
| | | final byte[] key = |
| | | toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId() |
| | | + FIELD_SEPARATOR + baseDN.toNormalizedString()); |
| | | final ByteStringBuilder data = new ByteStringBuilder(8); // store a long |
| | | data.append(offlineCSN.getTime()); |
| | | return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray()); |
| | | } |
| | | |
| | | private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry) |
| | | throws ChangelogException, RuntimeException |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(entry.getKey()); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == NOTFOUND) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try |
| | | { |
| | | data.setData(toBytes(entry.getValue())); |
| | | data.setData(entry.getValue()); |
| | | if (debugEnabled()) |
| | | { |
| | | debug("putting record in the changelogstate Db key=[" |
| | | + entry.getKey() + "] value=[" + entry.getValue() + "]"); |
| | | + toString(entry.getKey()) + "] value=[" |
| | | + toString(entry.getValue()) + "]"); |
| | | } |
| | | changelogStateDb.put(txn, key, data); |
| | | txn.commit(Durability.COMMIT_WRITE_NO_SYNC); |
| | |
| | | */ |
| | | public void clearServerId(DN baseDN, int serverId) throws ChangelogException |
| | | { |
| | | deleteFromChangelogStateDB(toReplicaEntry(baseDN, serverId), |
| | | deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)), |
| | | "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")"); |
| | | } |
| | | |
| | | private void deleteFromChangelogStateDB(Entry<String, ?> entry, |
| | | private void deleteFromChangelogStateDB(Entry<byte[], ?> entry, |
| | | String methodInvocation) throws ChangelogException |
| | | { |
| | | if (debugEnabled()) |
| | |
| | | |
| | | try |
| | | { |
| | | final DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey())); |
| | | final DatabaseEntry key = new DatabaseEntry(entry.getKey()); |
| | | final DatabaseEntry data = new DatabaseEntry(); |
| | | if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == SUCCESS) |
| | | { |
| | |
| | | throw dbe; |
| | | } |
| | | } |
| | | else |
| | | else if (debugEnabled()) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debug(methodInvocation + " failed: key=[ " + entry.getKey() |
| | | + "] not found"); |
| | | } |
| | | debug(methodInvocation + " failed: key not found"); |
| | | } |
| | | } |
| | | catch (RuntimeException dbe) |
| | | catch (RuntimeException e) |
| | | { |
| | | throw new ChangelogException(dbe); |
| | | if (debugEnabled()) |
| | | { |
| | | debug(methodInvocation + " error: " + stackTraceToSingleLineString(e)); |
| | | } |
| | | throw new ChangelogException(e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Add the information about an offline replica to the changelog state DB. |
| | | * |
| | | * @param baseDN |
| | | * the domain of the offline replica |
| | | * @param offlineCSN |
| | | * the offline replica serverId and offline timestamp |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | public void addOfflineReplica(DN baseDN, CSN offlineCSN) |
| | | throws ChangelogException |
| | | { |
| | | // just overwrite any older entry as it is assumed a newly received offline |
| | | // CSN is newer than the previous one |
| | | putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN), |
| | | "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")"); |
| | | } |
| | | |
| | | private void putInChangelogStateDB(Entry<byte[], byte[]> entry, |
| | | String methodInvocation) throws ChangelogException |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debug(methodInvocation + " starting"); |
| | | } |
| | | |
| | | try |
| | | { |
| | | final DatabaseEntry key = new DatabaseEntry(entry.getKey()); |
| | | final DatabaseEntry data = new DatabaseEntry(entry.getValue()); |
| | | changelogStateDb.put(null, key, data); |
| | | if (debugEnabled()) |
| | | { |
| | | debug(methodInvocation + " succeeded"); |
| | | } |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debug(methodInvocation + " error: " + stackTraceToSingleLineString(e)); |
| | | } |
| | | throw new ChangelogException(e); |
| | | } |
| | | } |
| | | |