国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當前位置: 首頁 > news >正文

淘寶網(wǎng)站建設協(xié)議谷歌瀏覽器下載手機版安卓官網(wǎng)

淘寶網(wǎng)站建設協(xié)議,谷歌瀏覽器下載手機版安卓官網(wǎng),澧縣住房和城鄉(xiāng)建設局網(wǎng)站,泉州市新濠網(wǎng)絡科技有限公司背景 在之前的文章中Apache Hudi初探(二)(與flink的結合)–flink寫hudi的操作(JobManager端的提交操作) 有說到寫hudi數(shù)據(jù)會涉及到寫hudi真實數(shù)據(jù)以及寫hudi元數(shù)據(jù),這篇文章來說一下具體的實現(xiàn) 寫hudi真實數(shù)據(jù) 這里的操作就是在HoodieFlinkWriteClient.upsert方法: public …

背景

在之前的文章中Apache Hudi初探(二)(與flink的結合)–flink寫hudi的操作(JobManager端的提交操作) 有說到寫hudi數(shù)據(jù)會涉及到寫hudi真實數(shù)據(jù)以及寫hudi元數(shù)據(jù),這篇文章來說一下具體的實現(xiàn)

寫hudi真實數(shù)據(jù)

這里的操作就是在HoodieFlinkWriteClient.upsert方法:

public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));table.validateUpsertSchema();preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),instantTime, table, records.listIterator());HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).upsert(context, writeHandle, instantTime, records);if (result.getIndexLookupDuration().isPresent()) {metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());}return postWrite(result, instantTime, table);}
  • initTable
    初始化HoodieFlinkTable
  • preWrite
    在這里幾乎沒什么操作
  • getOrCreateWriteHandle
    創(chuàng)建一個寫文件的handle(假如這里創(chuàng)建的是FlinkMergeAndReplaceHandle),這里會記錄已有的文件路徑,后續(xù)FlinkMergeHelper.runMerge會從這里讀取數(shù)
    注意該構造函數(shù)中的init方法,會創(chuàng)建一個ExternalSpillableMap類型的map來存儲即將插入的記錄,這在后續(xù)upsert中會用到
  • HoodieFlinkTable.upsert
    這里進行真正的upsert操作,會調(diào)用FlinkUpsertDeltaCommitActionExecutor.execute,最終會調(diào)用到BaseFlinkCommitActionExecutor.execute,從而調(diào)用到FlinkMergeHelper.newInstance().runMerge
      public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,..) {final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {readSchema = baseFileReader.getSchema();gWriter = new GenericDatumWriter<>(readSchema);gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields());} else {gReader = null;gWriter = null;readSchema = mergeHandle.getWriterSchemaWithMetaFields();}wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),Option.of(new UpdateHandler(mergeHandle)), record -> {if (!externalSchemaTransformation) {return record;}return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record);});wrapper.execute();。。。mergeHandle.close();}
    • externalSchemaTransformation=
      這里有hoodie.avro.schema.external.transformation配置(默認是false)用來把在之前schame下的數(shù)據(jù)轉換為新的schema下的數(shù)據(jù)
    • wrapper.execute()
      這里會最終調(diào)用到upsertHandle.write(record),也就是UpdateHandler.consumeOneRecord方法被調(diào)用的地方
       public void write(GenericRecord oldRecord) {...if (keyToNewRecords.containsKey(key)) {if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {copyOldRecord = true;} else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedAvroRecord)) {copyOldRecord = false;}writtenRecordKeys.add(key); }}
      
      如果keyToNewRecords報班了對應的記錄,也就是說會有uodate的操作的話,就插入新的數(shù)據(jù),
      writeUpdateRecord 這里進行數(shù)據(jù)的更新,并用writtenRecordKeys記錄插入的記錄
    • mergeHandle.close()
       public List<WriteStatus> close() {writeIncomingRecords();...}...protected void writeIncomingRecords() throws IOException {// write out any pending records (this can happen when inserts are turned into updates)Iterator<HoodieRecord<T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator();while (newRecordsItr.hasNext()) {HoodieRecord<T> hoodieRecord = newRecordsItr.next();if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {writeInsertRecord(hoodieRecord);}}}
      
      這里的writeIncomingRecords會判斷如果writtenRecordKeys沒有包含該記錄的話,就直接插入數(shù)據(jù),而不是更新

總結一下upsert的關鍵點:

mergeHandle.close()才是真正的寫數(shù)據(jù)(insert)的時候,在初始化handle的時候會把記錄傳導writtenRecordKeys中(在HoodieMergeHandle中的init方法)mergeHandle的write() 方法會在寫入數(shù)據(jù)的時候,如果發(fā)現(xiàn)有新的數(shù)據(jù),則會寫入新的數(shù)據(jù)(update)

寫hudi元數(shù)據(jù)

這里的操作是StreamWriteOperatorCoordinator.notifyCheckpointComplete方法

public void notifyCheckpointComplete(long checkpointId) {...final boolean committed = commitInstant(this.instant, checkpointId);...
}...
private boolean commitInstant(String instant, long checkpointId){...doCommit(instant, writeResults);...
}...
private void doCommit(String instant, List<WriteStatus> writeResults) {// commit or rollbacklong totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);boolean hasErrors = totalErrorRecords > 0;if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {HashMap<String, String> checkpointCommitMetadata = new HashMap<>();if (hasErrors) {LOG.warn("Some records failed to merge but forcing commit since commitOnErrors set to true. Errors/Total="+ totalErrorRecords + "/" + totalRecords);}final Map<String, List<String>> partitionToReplacedFileIds = tableState.isOverwrite? writeClient.getPartitionToReplacedFileIds(tableState.operationType, writeResults): Collections.emptyMap();boolean success = writeClient.commit(instant, writeResults, Option.of(checkpointCommitMetadata),tableState.commitAction, partitionToReplacedFileIds);if (success) {reset();this.ckpMetadata.commitInstant(instant);LOG.info("Commit instant [{}] success!", instant);} else {throw new HoodieException(String.format("Commit instant [%s] failed!", instant));}} else {LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);LOG.error("The first 100 error messages");writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> {LOG.error("Global error for partition path {} and fileID {}: {}",ws.getGlobalError(), ws.getPartitionPath(), ws.getFileId());if (ws.getErrors().size() > 0) {ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " and value " + value));}});// Rolls back instantwriteClient.rollback(instant);throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", instant));}
}

主要在commitInstant涉及動的方法doCommit(instant, writeResults)
如果說沒有錯誤發(fā)生的話,就繼續(xù)下一步:
這里的提交過程和spark中一樣,具體參考Apache Hudi初探(五)(與spark的結合)

其他

在flink和spark中新寫入的文件是在哪里分配對一個的fieldId:

//Flink中
BucketAssignFunction 中processRecord getNewRecordLocation 分配新的 fieldId//Spark中
BaseSparkCommitActionExecutor 中execute方法 中 handleUpsertPartition 涉及到的UpsertPartitioner getBucketInfo方法
其中UpsertPartitioner構造函數(shù)中 assignInserts 方法涉及到分配新的 fieldId
http://aloenet.com.cn/news/33074.html

相關文章:

  • 昭陽區(qū)住房和城鄉(xiāng)建設管理局網(wǎng)站重慶關鍵詞seo排名
  • 京東網(wǎng)上商城投訴電話天津seo托管
  • 網(wǎng)站開發(fā)地圖板塊浮動青島谷歌優(yōu)化
  • 旅游網(wǎng)站制作百度云網(wǎng)站搜索排名優(yōu)化軟件
  • 定制網(wǎng)站建設報價單網(wǎng)址導航下載到桌面
  • 原創(chuàng)網(wǎng)站模版營銷策劃方案模板范文
  • 溫州專業(yè)網(wǎng)站建設推廣seo信息是什么
  • 南通企業(yè)網(wǎng)頁制作江西省seo
  • 大連日文網(wǎng)站設計營銷型網(wǎng)站建設策劃書
  • 做網(wǎng)站工具免費的seo優(yōu)化工具
  • 哪個網(wǎng)站可以免費做H51+x網(wǎng)店運營推廣
  • 邢臺做網(wǎng)站建設優(yōu)化制作公司企業(yè)建站 平臺
  • 網(wǎng)站推廣什么意思百度搜圖匹配相似圖片
  • 常州網(wǎng)站優(yōu)化營銷軟文小短文
  • 怎么注冊公司教程鄭州seo排名扣費
  • 付費的網(wǎng)站是指seo網(wǎng)站排名助手
  • 40萬用戶自助建站seo根據(jù)什么具體優(yōu)化
  • 手機端公司網(wǎng)站怎么做seo快速排名優(yōu)化方法
  • 前端后端四川seo整站優(yōu)化
  • iis網(wǎng)站壓縮百度搜索次數(shù)統(tǒng)計
  • 東營建設信息網(wǎng)公示專業(yè)整站優(yōu)化
  • php模板網(wǎng)站營銷推廣計劃怎么寫
  • ps怎么網(wǎng)站首頁網(wǎng)店推廣運營
  • 一浪網(wǎng)站建設競價網(wǎng)站
  • 網(wǎng)站優(yōu)化吧推廣發(fā)布任務平臺app下載
  • 做旅游網(wǎng)站包括哪些欄目成都百度業(yè)務員電話
  • 大數(shù)據(jù)開發(fā)平臺seo診斷優(yōu)化方案
  • 芯片設計公司重慶 seo
  • 免備案空間哪家好寧波seo服務推廣
  • 如今做哪些網(wǎng)站致富手機營銷推廣方案