淘寶網(wǎng)站建設協(xié)議谷歌瀏覽器下載手機版安卓官網(wǎng)
背景
在之前的文章中Apache Hudi初探(二)(與flink的結合)–flink寫hudi的操作(JobManager端的提交操作) 有說到寫hudi數(shù)據(jù)會涉及到寫hudi真實數(shù)據(jù)以及寫hudi元數(shù)據(jù),這篇文章來說一下具體的實現(xiàn)
寫hudi真實數(shù)據(jù)
這里的操作就是在HoodieFlinkWriteClient.upsert方法:
public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));table.validateUpsertSchema();preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),instantTime, table, records.listIterator());HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).upsert(context, writeHandle, instantTime, records);if (result.getIndexLookupDuration().isPresent()) {metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());}return postWrite(result, instantTime, table);}
- initTable
初始化HoodieFlinkTable - preWrite
在這里幾乎沒什么操作 - getOrCreateWriteHandle
創(chuàng)建一個寫文件的handle(假如這里創(chuàng)建的是FlinkMergeAndReplaceHandle),這里會記錄已有的文件路徑,后續(xù)FlinkMergeHelper.runMerge會從這里讀取數(shù)
注意該構造函數(shù)中的init方法,會創(chuàng)建一個ExternalSpillableMap類型的map來存儲即將插入的記錄,這在后續(xù)upsert中會用到 - HoodieFlinkTable.upsert
這里進行真正的upsert操作,會調(diào)用FlinkUpsertDeltaCommitActionExecutor.execute,最終會調(diào)用到BaseFlinkCommitActionExecutor.execute,從而調(diào)用到FlinkMergeHelper.newInstance().runMergepublic void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,..) {final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {readSchema = baseFileReader.getSchema();gWriter = new GenericDatumWriter<>(readSchema);gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields());} else {gReader = null;gWriter = null;readSchema = mergeHandle.getWriterSchemaWithMetaFields();}wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),Option.of(new UpdateHandler(mergeHandle)), record -> {if (!externalSchemaTransformation) {return record;}return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record);});wrapper.execute();。。。mergeHandle.close();}
- externalSchemaTransformation=
這里有hoodie.avro.schema.external.transformation配置(默認是false)用來把在之前schame下的數(shù)據(jù)轉換為新的schema下的數(shù)據(jù) - wrapper.execute()
這里會最終調(diào)用到upsertHandle.write(record),也就是UpdateHandler.consumeOneRecord方法被調(diào)用的地方
如果keyToNewRecords報班了對應的記錄,也就是說會有uodate的操作的話,就插入新的數(shù)據(jù),public void write(GenericRecord oldRecord) {...if (keyToNewRecords.containsKey(key)) {if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {copyOldRecord = true;} else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedAvroRecord)) {copyOldRecord = false;}writtenRecordKeys.add(key); }}
writeUpdateRecord 這里進行數(shù)據(jù)的更新,并用writtenRecordKeys記錄插入的記錄 - mergeHandle.close()
這里的writeIncomingRecords會判斷如果writtenRecordKeys沒有包含該記錄的話,就直接插入數(shù)據(jù),而不是更新public List<WriteStatus> close() {writeIncomingRecords();...}...protected void writeIncomingRecords() throws IOException {// write out any pending records (this can happen when inserts are turned into updates)Iterator<HoodieRecord<T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator();while (newRecordsItr.hasNext()) {HoodieRecord<T> hoodieRecord = newRecordsItr.next();if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {writeInsertRecord(hoodieRecord);}}}
- externalSchemaTransformation=
總結一下upsert的關鍵點:
mergeHandle.close()才是真正的寫數(shù)據(jù)(insert)的時候,在初始化handle的時候會把記錄傳導writtenRecordKeys中(在HoodieMergeHandle中的init方法)mergeHandle的write() 方法會在寫入數(shù)據(jù)的時候,如果發(fā)現(xiàn)有新的數(shù)據(jù),則會寫入新的數(shù)據(jù)(update)
寫hudi元數(shù)據(jù)
這里的操作是StreamWriteOperatorCoordinator.notifyCheckpointComplete
方法
public void notifyCheckpointComplete(long checkpointId) {...final boolean committed = commitInstant(this.instant, checkpointId);...
}...
private boolean commitInstant(String instant, long checkpointId){...doCommit(instant, writeResults);...
}...
private void doCommit(String instant, List<WriteStatus> writeResults) {// commit or rollbacklong totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);boolean hasErrors = totalErrorRecords > 0;if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {HashMap<String, String> checkpointCommitMetadata = new HashMap<>();if (hasErrors) {LOG.warn("Some records failed to merge but forcing commit since commitOnErrors set to true. Errors/Total="+ totalErrorRecords + "/" + totalRecords);}final Map<String, List<String>> partitionToReplacedFileIds = tableState.isOverwrite? writeClient.getPartitionToReplacedFileIds(tableState.operationType, writeResults): Collections.emptyMap();boolean success = writeClient.commit(instant, writeResults, Option.of(checkpointCommitMetadata),tableState.commitAction, partitionToReplacedFileIds);if (success) {reset();this.ckpMetadata.commitInstant(instant);LOG.info("Commit instant [{}] success!", instant);} else {throw new HoodieException(String.format("Commit instant [%s] failed!", instant));}} else {LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);LOG.error("The first 100 error messages");writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> {LOG.error("Global error for partition path {} and fileID {}: {}",ws.getGlobalError(), ws.getPartitionPath(), ws.getFileId());if (ws.getErrors().size() > 0) {ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " and value " + value));}});// Rolls back instantwriteClient.rollback(instant);throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", instant));}
}
主要在commitInstant涉及動的方法doCommit(instant, writeResults)
如果說沒有錯誤發(fā)生的話,就繼續(xù)下一步:
這里的提交過程和spark中一樣,具體參考Apache Hudi初探(五)(與spark的結合)
其他
在flink和spark中新寫入的文件是在哪里分配對一個的fieldId:
//Flink中
BucketAssignFunction 中processRecord getNewRecordLocation 分配新的 fieldId//Spark中
BaseSparkCommitActionExecutor 中execute方法 中 handleUpsertPartition 涉及到的UpsertPartitioner getBucketInfo方法
其中UpsertPartitioner構造函數(shù)中 assignInserts 方法涉及到分配新的 fieldId