国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁 > news >正文

三門峽建設(shè)網(wǎng)站哪家好深圳網(wǎng)絡(luò)推廣

三門峽建設(shè)網(wǎng)站哪家好,深圳網(wǎng)絡(luò)推廣,建設(shè)銀行江蘇官網(wǎng)招聘網(wǎng)站,做投標(biāo)需要知道什么網(wǎng)站FlinkSQL的行級權(quán)限解決方案及源碼,支持面向用戶級別的行級數(shù)據(jù)訪問控制,即特定用戶只能訪問授權(quán)過的行,隱藏未授權(quán)的行數(shù)據(jù)。此方案是實(shí)時(shí)領(lǐng)域Flink的解決方案,類似離線數(shù)倉Hive中Ranger Row-level Filter方案。 源碼地址: https…

FlinkSQL的行級權(quán)限解決方案及源碼,支持面向用戶級別的行級數(shù)據(jù)訪問控制,即特定用戶只能訪問授權(quán)過的行,隱藏未授權(quán)的行數(shù)據(jù)。此方案是實(shí)時(shí)領(lǐng)域Flink的解決方案,類似離線數(shù)倉Hive中Ranger Row-level Filter方案。

源碼地址: https://github.com/HamaWhiteGG/flink-sql-security

注: 此方案已產(chǎn)品化集成到實(shí)時(shí)計(jì)算平臺Dinky,歡迎試用。

一、基礎(chǔ)知識

1.1 行級權(quán)限

行級權(quán)限即橫向數(shù)據(jù)安全保護(hù),可以解決不同人員只允許訪問不同數(shù)據(jù)行的問題。例如針對訂單表,用戶A只能查看到北京區(qū)域的數(shù)據(jù),用戶B只能查看到杭州區(qū)域的數(shù)據(jù)。
在這里插入圖片描述

1.2 業(yè)務(wù)流程

1.2.1 設(shè)置行級權(quán)限

管理員配置用戶、表、行級權(quán)限條件,例如下面的配置。

序號用戶名表名行級權(quán)限條件
1用戶Aordersregion = ‘beijing’
2用戶Bordersregion = ‘hangzhou’

1.2.2 用戶查詢數(shù)據(jù)

用戶在系統(tǒng)上查詢orders表的數(shù)據(jù)時(shí),系統(tǒng)在底層查詢時(shí)會根據(jù)該用戶的行級權(quán)限條件來自動(dòng)過濾數(shù)據(jù),即讓行級權(quán)限生效。

當(dāng)用戶A和用戶B在執(zhí)行下面相同的SQL時(shí),會查看到不同的結(jié)果數(shù)據(jù)。

SELECT * FROM orders;

用戶A查看到的結(jié)果數(shù)據(jù)是:

order_idorder_datecustomer_namepriceproduct_idorder_statusregion
100012020-07-30 10:08:22Jack50.50102falsebeijing
100022020-07-30 10:11:09Sally15.00105falsebeijing

注: 系統(tǒng)底層最終執(zhí)行的SQL是: SELECT * FROM orders WHERE region = 'beijing'


用戶B查看到的結(jié)果數(shù)據(jù)是:

order_idorder_datecustomer_namepriceproduct_idorder_statusregion
100032020-07-30 12:00:30Edward25.25106falsehangzhou
100042022-12-15 12:11:09John78.00103falsehangzhou

注: 系統(tǒng)底層最終執(zhí)行的SQL是: SELECT * FROM orders WHERE region = 'hangzhou' 。

1.3 組件版本

組件名稱版本備注
Flink1.16.0
Flink-connector-mysql-cdc2.3.0

二、Hive行級權(quán)限解決方案

在離線數(shù)倉工具Hive領(lǐng)域,由于發(fā)展多年已有Ranger來支持表數(shù)據(jù)的行級權(quán)限控制,詳見參考文獻(xiàn)[2]。下圖是在Ranger里配置Hive表行級過濾條件的頁面,供參考。
在這里插入圖片描述

但由于Flink實(shí)時(shí)數(shù)倉領(lǐng)域發(fā)展相對較短,Ranger還不支持FlinkSQL,以及要依賴Ranger會導(dǎo)致系統(tǒng)部署和運(yùn)維過重,因此開始自研實(shí)時(shí)數(shù)倉的行級權(quán)限解決工具

三、FlinkSQL行級權(quán)限解決方案

3.1 解決方案

3.1.1 FlinkSQL執(zhí)行流程

可以參考作者文章[FlinkSQL字段血緣解決方案及源碼],本文根據(jù)Flink1.16修正和簡化后的執(zhí)行流程如下圖所示。
在這里插入圖片描述
在CalciteParser.parse()處理后會得到一個(gè)SqlNode類型的抽象語法樹(Abstract Syntax Tree,簡稱AST),本文會在Parse階段,通過組裝行級過濾條件生成新的AST來實(shí)現(xiàn)行級權(quán)限控制。

3.1.2 Calcite對象繼承關(guān)系

下面章節(jié)要用到Calcite中的SqlNode、SqlCall、SqlIdentifier、SqlJoin、SqlBasicCall和SqlSelect等類,此處進(jìn)行簡單介紹以及展示它們間繼承關(guān)系,以便讀者閱讀本文源碼。

序號介紹
1SqlNodeA SqlNode is a SQL parse tree.
2SqlCallA SqlCall is a call to an SqlOperator operator.
3SqlIdentifierA SqlIdentifier is an identifier, possibly compound.
4SqlJoinParse tree node representing a JOIN clause.
5SqlBasicCallImplementation of SqlCall that keeps its operands in an array.
6SqlSelectA SqlSelect is a node of a parse tree which represents a select statement.

在這里插入圖片描述

3.1.3 解決思路

在Parser階段,如果執(zhí)行的SQL包含對表的查詢操作,則一定會構(gòu)建Calcite SqlSelect對象。因此限制表的行級權(quán)限,只要在構(gòu)建Calcite SqlSelect對象時(shí)對Where條件進(jìn)行攔截即可,而不需要解析用戶執(zhí)行的各種SQL來查找配置過行級權(quán)限條件約束的表。

在SqlSelect對象構(gòu)造Where條件時(shí),要通過執(zhí)行用戶和表名來查找配置的行級權(quán)限條件,系統(tǒng)會把此條件用CalciteParser提供的parseExpression(String sqlExpression)方法解析生成一個(gè)SqlBacicCall再返回。然后結(jié)合用戶執(zhí)行的SQL和配置的行級權(quán)限條件重新組裝Where條件,即生成新的帶行級過濾條件Abstract Syntax Tree,最后基于新的AST再執(zhí)行后續(xù)的Validate、Convert、Optimize和Execute階段。
在這里插入圖片描述

以上整個(gè)過程對執(zhí)行SQL的用戶都是透明和無感知的,還是調(diào)用Flink自帶的TableEnvironment.executeSql(String statement)方法即可。

注: 要通過技術(shù)手段把執(zhí)行用戶傳遞到Calcite SqlSelect中。

3.2 重寫SQL

主要在org.apache.calcite.sql.SqlSelect的構(gòu)造方法中完成。

3.2.1 主要流程

主流程如下圖所示,根據(jù)From的類型進(jìn)行不同的操作,例如針對SqlJoin類型,要分別遍歷其left和right節(jié)點(diǎn),而且要支持遞歸操作以便支持三張表及以上JOIN;針對SqlIdentifier類型,要額外判斷下是否來自JOIN,如果是的話且JOIN時(shí)且未定義表別名,則用表名作為別名;針對SqlBasicCall類型,如果來自于子查詢,說明已在子查詢中組裝過行級權(quán)限條件,則直接返回當(dāng)前Where即可,否則分別取出表名和別名。

然后再獲取行級權(quán)限條件解析后生成SqlBacicCall類型的Permissions,并給Permissions增加別名,最后把已有Where和Permissions進(jìn)行組裝生成新的Where,來作為SqlSelect對象的Where約束。
在這里插入圖片描述

上述流程圖的各個(gè)分支,都會在下面的用例測試章節(jié)中會舉例說明。

3.2.2 核心源碼

核心源碼位于SqlSelect中新增的addCondition()、addPermission()、buildWhereClause()三個(gè)方法,下面只給出控制主流程addCondition()的源碼。

/*** The main process of controlling row-level permissions*/
private SqlNode addCondition(SqlNode from, SqlNode where, boolean fromJoin) {if (from instanceof SqlIdentifier) {String tableName = from.toString();// the table name is used as an alias for joinString tableAlias = fromJoin ? tableName : null;return addPermission(where, tableName, tableAlias);} else if (from instanceof SqlJoin) {SqlJoin sqlJoin = (SqlJoin) from;// support recursive processing, such as join for three tables, process left sqlNodewhere = addCondition(sqlJoin.getLeft(), where, true);// process right sqlNodereturn addCondition(sqlJoin.getRight(), where, true);} else if (from instanceof SqlBasicCall) {// Table has an alias or comes from a subquerySqlNode[] tableNodes = ((SqlBasicCall) from).getOperands();/*** If there is a subquery in the Join, row-level filtering has been appended to the subquery.* What is returned here is the SqlSelect type, just return the original where directly*/if (!(tableNodes[0] instanceof SqlIdentifier)) {return where;}String tableName = tableNodes[0].toString();String tableAlias = tableNodes[1].toString();return addPermission(where, tableName, tableAlias);}return where;
}

四、用例測試

用例測試數(shù)據(jù)來自于CDC Connectors for Apache Flink
[6]官網(wǎng),在此表示感謝。下載本文源碼后,可通過Maven運(yùn)行單元測試。

$ cd flink-sql-security
$ mvn test

4.1 新建Mysql表及初始化數(shù)據(jù)

Mysql新建表語句及初始化數(shù)據(jù)SQL詳見源碼[flink-sql-security/data/database]里面的mysql_ddl.sql和mysql_init.sql文件,本文給orders表增加一個(gè)region字段。

4.2 新建Flink表

4.2.1 新建mysql cdc類型的orders表

DROP TABLE IF EXISTS orders;CREATE TABLE IF NOT EXISTS orders (order_id INT PRIMARY KEY NOT ENFORCED,order_date TIMESTAMP(0),customer_name STRING,product_id INT,price DECIMAL(10, 5),order_status BOOLEAN,region STRING
) WITH ('connector'='mysql-cdc','hostname'='xxx.xxx.xxx.xxx','port'='3306','username'='root','password'='xxx','server-time-zone'='Asia/Shanghai','database-name'='demo','table-name'='orders'
);

4.2.2 新建mysql cdc類型的products表

DROP TABLE IF EXISTS products;CREATE TABLE IF NOT EXISTS products (id INT PRIMARY KEY NOT ENFORCED,name STRING,description STRING
) WITH ('connector'='mysql-cdc','hostname'='xxx.xxx.xxx.xxx','port'='3306','username'='root','password'='xxx','server-time-zone'='Asia/Shanghai','database-name'='demo','table-name'='products'
);

4.2.3 新建mysql cdc類型shipments表

DROP TABLE IF EXISTS shipments;CREATE TABLE IF NOT EXISTS shipments (shipment_id INT PRIMARY KEY NOT ENFORCED,order_id INT,origin STRING,destination STRING,is_arrived BOOLEAN
) WITH ('connector'='mysql-cdc','hostname'='xxx.xxx.xxx.xxx','port'='3306','username'='root','password'='xxx','server-time-zone'='Asia/Shanghai','database-name'='demo','table-name'='shipments'
);

4.2.4 新建print類型print_sink表

DROP TABLE IF EXISTS print_sink;CREATE TABLE IF NOT EXISTS print_sink (order_id INT PRIMARY KEY NOT ENFORCED,order_date TIMESTAMP(0),customer_name STRING,product_id INT,price DECIMAL(10, 5),order_status BOOLEAN,region STRING
) WITH ('connector'='print'
);

4.3 測試用例

詳細(xì)測試用例可查看源碼中的單測,下面只描述部分測試點(diǎn)。

4.3.1 簡單SELECT

4.3.1.1 行級權(quán)限條件
序號用戶名表名行級權(quán)限條件
1用戶Aordersregion = ‘beijing’
4.3.1.2 輸入SQL
SELECT * FROM orders;
4.3.1.3 輸出SQL
SELECT * FROM orders WHERE region = 'beijing';
4.3.1.4 測試小結(jié)

輸入SQL中沒有WHERE條件,只需要把行級過濾條件region = 'beijing'追加到WHERE后即可。

4.3.2 SELECT帶復(fù)雜WHERE約束

4.3.2.1 行級權(quán)限條件
序號用戶名表名行級權(quán)限條件
1用戶Aordersregion = ‘beijing’
4.3.2.2 輸入SQL
SELECT * FROM orders WHERE price > 45.0 OR customer_name = 'John';
4.3.2.3 輸出SQL
SELECT * FROM orders WHERE (price > 45.0 OR customer_name = 'John') AND region = 'beijing';
4.3.2.4 測試小結(jié)

輸入SQL中有兩個(gè)約束條件,中間用的是OR,因此在組裝region = 'beijing'時(shí),要給已有的price > 45.0 OR customer_name = 'John'增加括號。

4.3.3 兩表JOIN且含子查詢

4.3.3.1 行級權(quán)限條件
序號用戶名表名行級權(quán)限條件
1用戶Aordersregion = ‘beijing’
4.3.3.2 輸入SQL
SELECTo.*,p.name,p.description
FROM (SELECT*FROM ordersWHERE order_status = FALSE) AS o
LEFT JOIN products AS p ON o.product_id = p.id
WHEREo.price > 45.0 OR o.customer_name = 'John' 
4.3.3.3 輸出SQL
SELECTo.*,p.name,p.description
FROM (SELECT*FROM ordersWHERE order_status = FALSE AND region = 'beijing') AS o
LEFT JOIN products AS p ON o.product_id = p.id
WHEREo.price > 45.0 OR o.customer_name = 'John' 
4.3.3.4 測試小結(jié)

針對比較復(fù)雜的SQL,例如兩表在JOIN時(shí)且其中左表來自于子查詢SELECT * FROM orders WHERE order_status = FALSE,行級過濾條件region = 'beijing'只會追加到子查詢的里面。

4.3.4 三表JOIN

4.3.4.1 行級權(quán)限條件
序號用戶名表名行級權(quán)限條件
1用戶Aordersregion = ‘beijing’
2用戶Aproductsname = ‘hammer’
3用戶Ashipmentsis_arrived = FALSE
4.3.4.2 輸入SQL
SELECTo.*,p.name,p.description,s.shipment_id,s.origin,s.destination,s.is_arrived
FROMorders AS oLEFT JOIN products AS p ON o.product_id=p.idLEFT JOIN shipments AS s ON o.order_id=s.order_id;
4.3.4.3 輸出SQL
SELECTo.*,p.name,p.description,s.shipment_id,s.origin,s.destination,s.is_arrived
FROMorders AS oLEFT JOIN products AS p ON o.product_id=p.idLEFT JOIN shipments AS s ON o.order_id=s.order_id
WHEREo.region='beijing'AND p.name='hammer'AND s.is_arrived=FALSE;
4.3.4.4 測試小結(jié)

三張表進(jìn)行JOIN時(shí),會分別獲取ordersproducts、shipments三張表的行級權(quán)限條件: region = 'beijing'、name = 'hammer'is_arrived = FALSE,然后增加orders表的別名o、products表的別名p、shipments表的別名s,最后組裝到WHERE子句后面。

4.3.5 INSERT來自帶子查詢的SELECT

4.3.5.1 行級權(quán)限條件
序號用戶名表名行級權(quán)限條件
1用戶Aordersregion = ‘beijing’
4.3.5.2 輸入SQL
INSERT INTO print_sink SELECT * FROM (SELECT * FROM orders);
4.3.5.3 輸出SQL
INSERT INTO print_sink (SELECT * FROM (SELECT * FROM orders WHERE region = 'beijing'));
4.3.5.4 測試小結(jié)

無論運(yùn)行SQL類型是INSERT、SELECT或者其他,只會找到查詢oders表的子句,然后對其組裝行級權(quán)限條件。

4.3.6 運(yùn)行SQL

測試兩個(gè)不同用戶執(zhí)行相同的SQL,兩個(gè)用戶的行級權(quán)限條件不一樣。

4.3.6.1 行級權(quán)限條件
序號用戶名表名行級權(quán)限條件
1用戶Aordersregion = ‘beijing’
2用戶Bordersregion = ‘hangzhou’
4.3.6.2 輸入SQL
SELECT * FROM orders;
4.3.6.3 執(zhí)行SQL

用戶A的真實(shí)執(zhí)行SQL:

SELECT * FROM orders WHERE region = 'beijing';

用戶B的真實(shí)執(zhí)行SQL:

SELECT * FROM orders WHERE region = 'hangzhou';
4.3.6.4 測試小結(jié)

用戶調(diào)用下面的執(zhí)行方法,除傳遞要執(zhí)行的SQL參數(shù)外,只需要額外指定執(zhí)行的用戶即可,便能自動(dòng)按照行級權(quán)限限制來執(zhí)行。

/*** Execute the single sql with user permissions*/
public TableResult execute(String username, String singleSql) {System.setProperty(EXECUTE_USERNAME, username);return tableEnv.executeSql(singleSql);
}

五、源碼修改步驟

注: Flink版本1.16.0依賴的Calcite是1.26.0版本。

5.1 新增Parser和ParserImpl類

復(fù)制Flink源碼中的org.apache.flink.table.delegation.Parser和org.apache.flink.table.planner.delegation.ParserImpl到項(xiàng)目下,新增下面兩個(gè)方法及實(shí)現(xiàn)。

/*** Parses a SQL expression into a {@link SqlNode}. The {@link SqlNode} is not yet validated.** @param sqlExpression a SQL expression string to parse* @return a parsed SQL node* @throws SqlParserException if an exception is thrown when parsing the statement*/
@Override
public SqlNode parseExpression(String sqlExpression) {CalciteParser parser = calciteParserSupplier.get();return parser.parseExpression(sqlExpression);
}/*** Entry point for parsing SQL queries and return the abstract syntax tree** @param statement the SQL statement to evaluate* @return abstract syntax tree* @throws org.apache.flink.table.api.SqlParserException when failed to parse the statement*/
@Override
public SqlNode parseSql(String statement) {CalciteParser parser = calciteParserSupplier.get();// use parseSqlList here because we need to support statement end with ';' in sql client.SqlNodeList sqlNodeList = parser.parseSqlList(statement);List<SqlNode> parsed = sqlNodeList.getList();Preconditions.checkArgument(parsed.size() == 1, "only single statement supported");return parsed.get(0);
}

5.2 新增SqlSelect類

復(fù)制Calcite源碼中的org.apache.calcite.sql.SqlSelect到項(xiàng)目下,新增上文提到的addCondition()、addPermission()、buildWhereClause()三個(gè)方法。
并且在構(gòu)造方法中注釋掉原有的this.where = where行,并添加如下代碼:

// add row level filter condition for where clause
SqlNode rowFilterWhere = addCondition(from, where, false);
if (rowFilterWhere != where) {LOG.info("Rewritten SQL based on row-level privilege filtering for user [{}]", System.getProperty(EXECUTE_USERNAME));
}
this.where = rowFilterWhere;

5.3 封裝SecurityContext類

新建SecurityContext類,主要添加下面三個(gè)方法:

/*** Add row-level filter conditions and return new SQL*/
public String addRowFilter(String username, String singleSql) {System.setProperty(EXECUTE_USERNAME, username);// in the modified SqlSelect, filter conditions will be added to the where clauseSqlNode parsedTree = tableEnv.getParser().parseSql(singleSql);return parsedTree.toString();
}/*** Query the configured permission point according to the user name and table name, and return* it to SqlBasicCall*/
public SqlBasicCall queryPermissions(String username, String tableName) {String permissions = rowLevelPermissions.get(username, tableName);LOG.info("username: {}, tableName: {}, permissions: {}", username, tableName, permissions);if (permissions != null) {return (SqlBasicCall) tableEnv.getParser().parseExpression(permissions);}return null;
}/*** Execute the single sql with user permissions*/
public TableResult execute(String username, String singleSql) {System.setProperty(EXECUTE_USERNAME, username);return tableEnv.executeSql(singleSql);
}

六、下一步計(jì)劃

  1. 支持?jǐn)?shù)據(jù)脫敏(Data Masking)
  2. 開發(fā)ranger-flink-plugin

七、參考文獻(xiàn)

  1. 數(shù)據(jù)管理DMS-敏感數(shù)據(jù)管理-行級管控
  2. Apache Ranger Row-level Filter
  3. OpenLooKeng的行級權(quán)限控制
  4. PostgreSQL中的行級權(quán)限/數(shù)據(jù)權(quán)限/行安全策略
  5. FlinkSQL字段血緣解決方案及源碼
  6. 基于 Flink CDC 構(gòu)建 MySQL 和 Postgres 的 Streaming ETL
http://aloenet.com.cn/news/46575.html

相關(guān)文章:

  • 注冊公司后才可以做獨(dú)立網(wǎng)站嗎個(gè)人主頁網(wǎng)頁設(shè)計(jì)
  • 中國建設(shè)部官方網(wǎng)站魯班獎(jiǎng)網(wǎng)絡(luò)推廣的方法包括
  • 四川城鄉(xiāng)建設(shè)委員會的網(wǎng)站深圳百度競價(jià)托管公司
  • 做網(wǎng)站的空間汽車行業(yè)網(wǎng)站建設(shè)
  • 大型o2o網(wǎng)站開發(fā)時(shí)間怎么建造自己的網(wǎng)站
  • 開網(wǎng)站買自己做的東西什么是淘寶seo
  • 河南建設(shè)廳八大員查詢網(wǎng)站搜索引擎優(yōu)化關(guān)鍵字
  • 紅鵲豆網(wǎng)絡(luò)網(wǎng)站站建設(shè)營銷思路八大要點(diǎn)
  • 怎樣給網(wǎng)站或者商品做推廣抖音seo軟件工具
  • 廈門做網(wǎng)站建設(shè)榜單優(yōu)化
  • 給一個(gè)裝修公司怎么做網(wǎng)站今日新聞播報(bào)
  • 用手機(jī)制作ppt的軟件seo站長綜合查詢工具
  • 哪種語言做的網(wǎng)站好seo診斷站長
  • 福州綠光網(wǎng)站建設(shè)工作室seo sem是啥
  • led燈什么網(wǎng)站做推廣好百度小說排行榜2020
  • 網(wǎng)站空間 價(jià)格網(wǎng)站流量數(shù)據(jù)
  • 專業(yè)的網(wǎng)站建設(shè)商家免費(fèi)網(wǎng)站建設(shè)平臺
  • 網(wǎng)站被劫持怎么辦百度官方版
  • j建設(shè)網(wǎng)站制作網(wǎng)站教學(xué)
  • 專門做服裝批發(fā)的網(wǎng)站有哪些自己怎么做網(wǎng)頁推廣
  • 企業(yè)網(wǎng)站怎么做連接seo搜索引擎優(yōu)化視頻
  • php做動(dòng)漫網(wǎng)站google搜索優(yōu)化
  • 政府網(wǎng)站建設(shè)原則西安seo站內(nèi)優(yōu)化
  • 外國網(wǎng)站學(xué)習(xí)做任務(wù) 升級100大看免費(fèi)行情的軟件
  • 蘭州易天網(wǎng)站建設(shè)公司有哪些cpm廣告聯(lián)盟平臺
  • 租服務(wù)器的網(wǎng)站seo兼職怎么收費(fèi)
  • 淘寶里網(wǎng)站建設(shè)公司可以嗎地推十大推廣app平臺
  • 做網(wǎng)站怎么切圖網(wǎng)站seo收錄工具
  • 威客做的好的網(wǎng)站有哪些建網(wǎng)站哪個(gè)平臺好
  • 網(wǎng)站怎樣做平面設(shè)計(jì)圖百度貼吧入口