国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當前位置: 首頁 > news >正文

網站正在開發(fā)中鄭州谷歌優(yōu)化外包

網站正在開發(fā)中,鄭州谷歌優(yōu)化外包,做彩票網站需要什么,做做網站下載免費Gravitino SparkConnector 實現原理 本文參考了官網介紹,想看官方解析請參考 官網地址 本文僅僅介紹原理 文章目錄 Gravitino SparkConnector 實現原理背景知識-Spark Plugin 介紹(1) **插件加載**(2) **DriverPlugin 初始化**(3) **ExecutorPlugin 初始化**(4) *…

Gravitino SparkConnector 實現原理

本文參考了官網介紹,想看官方解析請參考 官網地址 本文僅僅介紹原理

文章目錄

  • Gravitino SparkConnector 實現原理
    • 背景知識-Spark Plugin 介紹
        • (1) **插件加載**
        • (2) **DriverPlugin 初始化**
        • (3) **ExecutorPlugin 初始化**
        • (4) **插件執(zhí)行**
        • (5) **插件銷毀**
    • 背景知識-Driver Plugin 介紹
        • (1) **`init` 方法**
        • (2) **`registerMetrics` 方法**
        • (3) **`onTaskStart` 方法**
        • (4) **`onTaskSucceeded` 方法**
        • (5) **`onTaskFailed` 方法**
        • (6) **`close` 方法**
    • SparkConnector使用方式
      • 加載spark.sql.catalog.xxx 具體執(zhí)行的配置

背景知識-Spark Plugin 介紹

spark在[spark-29399]pr提交更新了SparkPlugin插件

SparkPlugin插件執(zhí)行生命周期

SparkPlugin 的生命周期與 Spark 應用程序的生命周期一致,具體如下:

(1) 插件加載
  • 當 Spark 應用程序啟動時,Spark 會掃描類路徑下的 SparkPlugin 實現類。
  • 如果插件被正確配置(例如通過 spark.plugins 配置項),Spark 會實例化該類。
(2) DriverPlugin 初始化
  • Spark 調用 driverPlugin() 方法,獲取 DriverPlugin 實例。
  • DriverPlugin 的生命周期開始,其方法(如 init、registerMetrics 等)會被調用。
(3) ExecutorPlugin 初始化
  • Spark 調用 executorPlugin() 方法,獲取 ExecutorPlugin 實例。
  • ExecutorPlugin 的生命周期開始,其方法(如 init、shutdown 等)會被調用。
(4) 插件執(zhí)行
  • DriverPlugin 在 Driver 端執(zhí)行自定義邏輯,例如注冊指標、攔截 SQL 解析、修改 Catalog 等。
  • ExecutorPlugin 在 Executor 端執(zhí)行自定義邏輯,例如監(jiān)控 Task 執(zhí)行、收集指標等。
(5) 插件銷毀
  • 當 Spark 應用程序結束時,DriverPluginExecutorPlugin 的生命周期結束,其 close() 方法會被調用以釋放資源。

背景知識-Driver Plugin 介紹

DriverPlugin 是用于在 Driver 端執(zhí)行自定義邏輯的插件,其生命周期方法包括:

(1) init 方法
  • 在 Driver 插件初始化時調用。
  • 可以在此方法中執(zhí)行初始化邏輯,例如注冊自定義 Catalog、攔截 SQL 解析器等。
(2) registerMetrics 方法
  • 在 Driver 插件初始化時調用。
  • 可以在此方法中注冊自定義指標(Metrics)。
(3) onTaskStart 方法
  • 在 Task 啟動時調用。
  • 可以在此方法中執(zhí)行與 Task 相關的邏輯。
(4) onTaskSucceeded 方法
  • 在 Task 成功完成時調用。
  • 可以在此方法中執(zhí)行與 Task 成功相關的邏輯。
(5) onTaskFailed 方法
  • 在 Task 失敗時調用。
  • 可以在此方法中執(zhí)行與 Task 失敗相關的邏輯。
(6) close 方法
  • 在 Driver 插件銷毀時調用。
  • 可以在此方法中釋放資源,例如關閉連接、清理緩存等。

SparkConnector使用方式

./bin/spark-sql -v \
--conf spark.plugins="org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin" \
--conf spark.sql.gravitino.uri=http://127.0.0.1:8090 \
--conf spark.sql.gravitino.metalake=test \
--conf spark.sql.gravitino.enableIcebergSupport=true \
--conf spark.sql.warehouse.dir=hdfs://127.0.0.1:9000/user/hive/warehouse-hive

可以看出SparkConnector指定了加載的插件是GravitinoSparkPlugin

public class GravitinoSparkPlugin implements SparkPlugin {@Overridepublic DriverPlugin driverPlugin() {return new GravitinoDriverPlugin();}@Overridepublic ExecutorPlugin executorPlugin() {return null;}
}

可以看出實現方式很簡單,僅僅使用了一個GravitinoDriverPlugin,也就是在Spark應用程序啟動的時候掃描SparkPlugin掃描到了這個GravitinoSparkPlugin然后立馬就去執(zhí)行GravitinoDriverPlugin初始化程序。在DriverPlugin初始化過程中 插件僅僅覆寫了兩個函數,init()shutdown()。 說明這個插件僅僅做了一些初始化和資源銷毀操作。

在Driver端進行初始化

  1. 配置檢查檢查gravitino_uri和gravitino_metalake是否配置

  2. 如果開啟了iceberg則將gravitinoDriverExtensions放入到數組中方便配置

  3. 初始化Gravtino客戶端和GravitinoCatalogManager,并且將relational類型的表加載到緩存中

  4. 將緩存中的catalog進行如果是非iceberg類型(當前僅僅只有Hive)進行注冊,這里定義的注冊的實際操作配置Spark的配置項(spark.sql.catalog.catalogName)這里的catalogName對應的是緩存中的catalogName,配置的值為根據Gravitino自己的Catalog使用的Provider進行適配比如可以是(org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33或者org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33)具體情況由適配器進行處理。

  5. 然后注冊SqlExtensions其實就是將第2步驟的數組配置到SPARK_SESSION_EXTENSIONS這個SparkConf配置里面

稍微貼一下注冊Catalog代碼,比較重要

  //初始化的時候調用注冊邏輯,將Gravitino中的Catalog加載到緩存//然后將緩存中的數據作為第二個參數gravitinoCatalogs傳遞進來private void registerGravitinoCatalogs(SparkConf sparkConf, Map<String, Catalog> gravitinoCatalogs) {gravitinoCatalogs.entrySet().forEach(entry -> {String catalogName = entry.getKey();Catalog gravitinoCatalog = entry.getValue();String provider = gravitinoCatalog.provider();if ("lakehouse-iceberg".equals(provider.toLowerCase(Locale.ROOT))&& enableIcebergSupport == false) {return;}try {registerCatalog(sparkConf, catalogName, provider);} catch (Exception e) {LOG.warn("Register catalog {} failed.", catalogName, e);}});}//這里根據適配器去配置spark.sql.catalog.xxx 的具體執(zhí)行CatalogClassprivate void registerCatalog(SparkConf sparkConf, String catalogName, String provider) {if (StringUtils.isBlank(provider)) {LOG.warn("Skip registering {} because catalog provider is empty.", catalogName);return;}String catalogClassName = CatalogNameAdaptor.getCatalogName(provider);if (StringUtils.isBlank(catalogClassName)) {LOG.warn("Skip registering {} because {} is not supported yet.", catalogName, provider);return;}String sparkCatalogConfigName = "spark.sql.catalog." + catalogName;Preconditions.checkArgument(!sparkConf.contains(sparkCatalogConfigName),catalogName + " is already registered to SparkCatalogManager");sparkConf.set(sparkCatalogConfigName, catalogClassName);LOG.info("Register {} catalog to Spark catalog manager.", catalogName);}

到這里GravitinoConnector的代碼機制已經說完了,下面聊聊Spark機制

加載spark.sql.catalog.xxx 具體執(zhí)行的配置

經過上面GravitinoDriverPlugin的初始化之后,已經將具體的catalog名稱和對應的處理類映射起來,這里以GravitinoHiveCatalogSpark33為例。

GravitinoHiveCatalogSpark33這個類繼承關系是繼承了BaseCatalogBaseCatalog是Spark中定義的CatalogPlugin的一個實現類。

Spark在解析SQL的時候會查找catalog對應的Catalog,可以看到調用了CatalogManager.catalog()方法

  private object CatalogAndMultipartIdentifier {def unapply(parts: Seq[String]): Some[(Option[CatalogPlugin], Seq[String])] = parts match {case Seq(_) =>Some((None, parts))case Seq(catalogName, tail @ _*) =>try {Some((Some(catalogManager.catalog(catalogName)), tail))} catch {case _: CatalogNotFoundException =>Some((None, parts))}}}

這個catalog方法調用了Catalogs.load()方法

  def catalog(name: String): CatalogPlugin = synchronized {if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {v2SessionCatalog} else {catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))}}

這個方法才是真正的加載方法,他真正根據conf配置將GravitinoHiveCatalogSpark33名稱根據定義的反射構造函數實例化到內存中

   def load(name: String, conf: SQLConf): CatalogPlugin = {val pluginClassName = try {val _pluginClassName = conf.getConfString(s"spark.sql.catalog.$name")// SPARK-39079 do configuration check first, otherwise some path-based table like// `org.apache.spark.sql.json`.`/path/json_file` may fail on analyze phaseif (name.contains(".")) {throw QueryExecutionErrors.invalidCatalogNameError(name)}_pluginClassName} catch {case _: NoSuchElementException =>throw QueryExecutionErrors.catalogPluginClassNotFoundError(name)}val loader = Utils.getContextOrSparkClassLoadertry {val pluginClass = loader.loadClass(pluginClassName)if (!classOf[CatalogPlugin].isAssignableFrom(pluginClass)) {throw QueryExecutionErrors.catalogPluginClassNotImplementedError(name, pluginClassName)}val plugin = pluginClass.getDeclaredConstructor().newInstance().asInstanceOf[CatalogPlugin]plugin.initialize(name, catalogOptions(name, conf))plugin} catch {// 省略}}

到這里流程就分析結束了

http://aloenet.com.cn/news/39472.html

相關文章:

  • wordpress下拉篩選重慶做seo外包的
  • wordpress科技主題網站排名優(yōu)化公司
  • seo優(yōu)化排名推廣排名優(yōu)化系統(tǒng)
  • 網頁設計模板網站推薦外包網絡推廣公司
  • 上海網站開發(fā)公司外包自學seo能找到工作嗎
  • asp網站制作設計教程佛山網站優(yōu)化軟件
  • 海南省住房和城鄉(xiāng)建設廳網站首頁排名前50名免費的網站
  • 網站建設 云計算搜索數據
  • wordpress企業(yè)網站制作鄭州seo優(yōu)化
  • 這幾年做啥網站致富推廣鏈接讓別人點擊
  • 門戶網站建設如何入賬銅陵seo
  • 美國十大購物網站免費注冊個人網站不花錢
  • 長安東莞網站設計百度掃一掃識別圖片在線
  • logo設計培訓寧波seo網絡推廣優(yōu)化價格
  • 網站網頁設計中怎么添加頁碼信息谷歌海外推廣
  • 網站方案策劃5118營銷大數據
  • wordpress屏蔽垃圾國外ip領碩網站seo優(yōu)化
  • 網站建設服務好公司排名google瀏覽器官網下載
  • 做公司網站計入什么會計科目seo用什么論壇引流
  • 網站實現seo基礎知識考試
  • 怎樣優(yōu)化網站排名靠前泰州百度關鍵詞優(yōu)化
  • 重慶市工程建設信息網2021優(yōu)化關鍵詞的公司
  • 哈爾濱地鐵愛建站seo查詢網站是什么
  • 企業(yè)網站優(yōu)化找哪家搜索排行
  • wordpress本地建站成人零基礎學電腦培訓班
  • 瀚欽科技網站建設谷歌搜索引擎免費
  • 北京建站設計寫一篇軟文1000字
  • 有沒有專門做航拍婚禮網站應用下載app排行榜
  • wordpress動漫博客模板東莞seo靠譜
  • 網頁制作基礎教程第二版seo查詢 站長之家