国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當前位置: 首頁 > news >正文

什么叫網(wǎng)站建設(shè)手機怎么建網(wǎng)站

什么叫網(wǎng)站建設(shè),手機怎么建網(wǎng)站,靠譜企業(yè)網(wǎng)站設(shè)計公司,微信小程序商城怎么開發(fā)DTM 簡介前面章節(jié)提及的MassTransit、dotnetcore/CAP都提供了分布式事務(wù)的處理能力,但也僅局限于Saga和本地消息表模式的實現(xiàn)。那有沒有一個獨立的分布式事務(wù)解決方案,涵蓋多種分布式事務(wù)處理模式,如Saga、TCC、XA模式等。有,目前…

DTM 簡介


前面章節(jié)提及的MassTransit、dotnetcore/CAP都提供了分布式事務(wù)的處理能力,但也僅局限于Saga和本地消息表模式的實現(xiàn)。那有沒有一個獨立的分布式事務(wù)解決方案,涵蓋多種分布式事務(wù)處理模式,如Saga、TCC、XA模式等。有,目前業(yè)界主要有兩種開源方案,其一是阿里開源的Seata,另一個就是DTM。其中Seata僅支持Java、Go和Python語言,因此不在.NET 的選擇范圍。DTM則通過提供簡單易用的HTTP和gRPC接口,屏蔽了語言的無關(guān)性,因此支持任何開發(fā)語言接入,目前提供了Go、Python、NodeJs、Ruby、Java和C#等語言的SDK。

DTM,全稱Distributed Transaction Manager,是一個分布式事務(wù)管理器,解決跨數(shù)據(jù)庫、跨服務(wù)、跨語言更新數(shù)據(jù)的一致性問題。它提供了Saga、TCC、 XA和二階段消息模式以滿足不同應(yīng)用場景的需求,同時其首創(chuàng)的子事務(wù)屏障技術(shù)可以有效解決冪等、懸掛和空補償?shù)犬惓栴}。

DTM 事務(wù)處理過程及架構(gòu)


那DTM是如何處理分布式事務(wù)的呢?以一個經(jīng)典的跨行轉(zhuǎn)賬業(yè)務(wù)為例來看下事務(wù)處理過程。對于跨行轉(zhuǎn)賬業(yè)務(wù)而言,很顯然是跨庫跨服務(wù)的應(yīng)用場景,不能簡單通過本地事務(wù)解決,可以使用Saga模式,以下是基于DTM提供的Saga事務(wù)模式成功轉(zhuǎn)賬的的時序圖:

從以上時序圖可以看出,DTM整個全局事務(wù)分為如下幾步:

  1. 用戶定義好全局事務(wù)所有的事務(wù)分支(全局事務(wù)的組成部分稱為事務(wù)分支),然后提交給DTM,DTM持久化全局事務(wù)信息后,立即返回

  1. DTM取出第一個事務(wù)分支,這里是TransOut,調(diào)用該服務(wù)并成功返回

  1. DTM取出第二個事務(wù)分支,這里是TransIn,調(diào)用該服務(wù)并成功返回

  1. DTM已完成所有的事務(wù)分支,將全局事務(wù)的狀態(tài)修改為已完成

基于以上這個時序圖的基礎(chǔ)上,再來看下DTM的架構(gòu):

整個DTM架構(gòu)中,一共有三個角色,分別承擔了不同的職責:

  • RM-資源管理器:RM是一個應(yīng)用服務(wù),通常連接到獨立的數(shù)據(jù)庫,負責處理全局事務(wù)中的本地事務(wù),執(zhí)行相關(guān)數(shù)據(jù)的修改、提交、回滾、補償?shù)炔僮?。例如在前面的這個Saga事務(wù)時序圖中,步驟2、3中被調(diào)用的TransIn和TransOut方法所在的服務(wù)都是RM。

  • AP-應(yīng)用程序:AP是一個應(yīng)用服務(wù),負責全局事務(wù)的編排,他會注冊全局事務(wù),注冊子事務(wù),調(diào)用RM接口。例如在前面的這個SAGA事務(wù)中,發(fā)起步驟1的是AP,它編排了一個包含TransOut、TransIn的全局事務(wù),然后提交給TM

  • TM-事務(wù)管理器:TM就是DTM服務(wù),負責全局事務(wù)的管理,作為一個獨立的服務(wù)而存在。每個全局事務(wù)都注冊到TM,每個事務(wù)分支也注冊到TM。TM會協(xié)調(diào)所有的RM來執(zhí)行不同的事務(wù)分支,并根據(jù)執(zhí)行結(jié)果決定是否提交或回滾事務(wù)。例如在前面的Saga事務(wù)時序圖中,TM在步驟2、3中調(diào)用了各個RM,在步驟4中,完成這個全局事務(wù)。

總體而言,AP-應(yīng)用程序充當全局事務(wù)編排器的角色通過DTM提供的開箱即用的SDK進行全局事務(wù)和子事務(wù)的注冊。TM-事務(wù)管理器接收到注冊的全局事務(wù)和子事務(wù)后,負責調(diào)用RM-資源管理器來執(zhí)行對應(yīng)的事務(wù)分支,TM-事務(wù)管理器根據(jù)事務(wù)分支的執(zhí)行結(jié)果決定是否提及或回滾事務(wù)。

快速上手


百聞不如一見,接下來就來實際上手體驗下如何基于DTM來實際應(yīng)用Saga進行分布式跨行轉(zhuǎn)賬事務(wù)的處理。

創(chuàng)建示例項目


接下來就來創(chuàng)建一個示例項目:

  1. 使用dotnet new webapi -n DtmDemo.Webapi創(chuàng)建示例項目。

  1. 添加Nuget包:DtmcliPomelo.EntityFrameworkCore.MySql

  1. 添加DTM配置項:

{"dtm": {"DtmUrl": "http://localhost:36789","DtmTimeout": 10000,"BranchTimeout": 10000,"DBType": "mysql","BarrierTableName": "dtm_barrier.barrier",}
}
  1. 定義銀行賬戶BankAccount實體類:

namespaceDtmDemo.WebApi.Models
{publicclassBankAccount{publicint Id { get; set; }publicdecimal Balance { get; set; }}
}
  1. 定義DtmDemoWebApiContext數(shù)據(jù)庫上下文:

using Microsoft.EntityFrameworkCore;namespaceDtmDemo.WebApi.Data
{publicclassDtmDemoWebApiContext : DbContext{publicDtmDemoWebApiContext (DbContextOptions<DtmDemoWebApiContext> options): base(options){}public DbSet<DtmDemo.WebApi.Models.BankAccount> BankAccount { get; set; } = default!;}
}
  1. 注冊DbContext 和DTM服務(wù):

using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using Dtmcli;var builder = WebApplication.CreateBuilder(args);
var connectionStr = builder.Configuration.GetConnectionString("DtmDemoWebApiContext");
// 注冊DbContext
builder.Services.AddDbContext<DtmDemoWebApiContext>(options =>
{options.UseMySql(connectionStr, ServerVersion.AutoDetect(connectionStr));
});// 注冊DTM
builder.Services.AddDtmcli(builder.Configuration, "dtm");
  1. 執(zhí)行dotnet ef migrations add 'Initial' 創(chuàng)建遷移。

  1. 為便于初始化演示數(shù)據(jù),定義BankAccountController如下,其中PostBankAccount接口添加了await _context.Database.MigrateAsync();用于自動應(yīng)用遷移。

using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using DtmDemo.WebApi.Models;
using Dtmcli;namespaceDtmDemo.WebApi.Controllers
{[Route("api/[controller]")][ApiController]publicclassBankAccountsController : ControllerBase{privatereadonly DtmDemoWebApiContext _context;publicBankAccountsController(DtmDemoWebApiContext context){_context = context;}[HttpGet]publicasync Task<ActionResult<IEnumerable<BankAccount>>> GetBankAccount(){returnawait _context.BankAccount.ToListAsync();}[HttpPost]publicasync Task<ActionResult<BankAccount>> PostBankAccount(BankAccount bankAccount){await _context.Database.MigrateAsync();_context.BankAccount.Add(bankAccount);await _context.SaveChangesAsync();return Ok(bankAccount);}
}

應(yīng)用Saga模式


接下來定義SagaDemoController來使用DTM的Saga模式來模擬跨行轉(zhuǎn)賬分布式事務(wù):

using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using DtmDemo.WebApi.Models;
using Dtmcli;
using DtmCommon;namespaceDtmDemo.WebApi.Controllers
{[Route("api/[controller]")][ApiController]publicclassSagaDemoController : ControllerBase{privatereadonly DtmDemoWebApiContext _context;privatereadonly IConfiguration _configuration;privatereadonly IDtmClient _dtmClient;privatereadonly IDtmTransFactory _transFactory;privatereadonly IBranchBarrierFactory _barrierFactory;privatereadonly ILogger<BankAccountsController> _logger;publicSagaDemoController(DtmDemoWebApiContext context, IConfiguration configuration, IDtmClient dtmClient, IDtmTransFactory transFactory, ILogger<BankAccountsController> logger, IBranchBarrierFactory barrierFactory){this._context = context;this._configuration = configuration;this._dtmClient = dtmClient;this._transFactory = transFactory;this._logger = logger;this._barrierFactory = barrierFactory;}
}

對于跨行轉(zhuǎn)賬業(yè)務(wù),使用DTM的Saga模式,首先要進行事務(wù)拆分,可以拆分為以下4個子事務(wù),并分別實現(xiàn):

轉(zhuǎn)出子事務(wù)(TransferOut)


    [HttpPost("TransferOut")]publicasync Task<IActionResult> TransferOut([FromBody] TransferRequest request){var msg = $"用戶{request.UserId}轉(zhuǎn)出{request.Amount}元";_logger.LogInformation($"轉(zhuǎn)出子事務(wù)-啟動:{msg}");// 1. 創(chuàng)建子事務(wù)屏障var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);try{using (var conn = _context.Database.GetDbConnection()){// 2. 在子事務(wù)屏障內(nèi)執(zhí)行事務(wù)操作await branchBarrier.Call(conn, async (tx) =>{_logger.LogInformation($"轉(zhuǎn)出子事務(wù)-執(zhí)行:{msg}");await _context.Database.UseTransactionAsync(tx);var bankAccount = await _context.BankAccount.FindAsync(request.UserId);if (bankAccount == null || bankAccount.Balance < request.Amount)thrownew InvalidDataException("賬戶不存在或余額不足!");bankAccount.Balance -= request.Amount;await _context.SaveChangesAsync();});}}catch (InvalidDataException ex){_logger.LogInformation($"轉(zhuǎn)出子事務(wù)-失敗:{ex.Message}");// 3. 按照接口協(xié)議,返回409,以表示子事務(wù)失敗returnnew StatusCodeResult(StatusCodes.Status409Conflict);}_logger.LogInformation($"轉(zhuǎn)出子事務(wù)-成功:{msg}");return Ok();}

以上代碼中有幾點需要額外注意:

  1. 使用Saga模式,必須開啟子事務(wù)屏障:_barrierFactory.CreateBranchBarrier(Request.Query),其中Request.Query中的參數(shù)由DTM 生成,類似:?branch_id=01&gid=XTzKHgxemLyL8EXtMTLvzK&op=action&trans_type=saga,主要包含四個參數(shù):

  1. gid:全局事務(wù)Id

  1. trans_type:事務(wù)類型,是saga、msg、xa或者是tcc。

  1. branch_id:子事務(wù)的Id

  1. op:當前操作,對于Saga事務(wù)模式,要么為action(正向操作),要么為compensate(補償操作)。

  1. 必須在子事務(wù)屏障內(nèi)執(zhí)行事務(wù)操作:branchBarrier.Call(conn, async (tx) =>{}

  1. 對于Saga正向操作而言,業(yè)務(wù)上的失敗與異常是需要做嚴格區(qū)分的,例如前面的余額不足,是業(yè)務(wù)上的失敗,必須回滾。而對于網(wǎng)絡(luò)抖動等其他外界原因?qū)е碌氖聞?wù)失敗,屬于業(yè)務(wù)異常,則需要重試。因此若因業(yè)務(wù)失敗(這里是賬戶不存在或余額不足)而導致子事務(wù)失敗,則必須通過拋異常的方式并返回**409**狀態(tài)碼以告知DTM 子事務(wù)失敗。

  1. 以上通過拋出異常的方式中斷子事務(wù)執(zhí)行并在外圍捕獲特定異常返回409狀態(tài)碼。在外圍捕獲異常時切忌放大異常捕獲,比如直接catch(Exception),如此會捕獲由于網(wǎng)絡(luò)等其他原因?qū)е碌漠惓?#xff0c;而導致DTM 不再自動處理該異常,比如業(yè)務(wù)異常時的自動重試。

轉(zhuǎn)出補償子事務(wù)(TransferOut_Compensate)


轉(zhuǎn)出補償,就是回滾轉(zhuǎn)出操作,進行賬戶余額歸還,實現(xiàn)如下:

    [HttpPost("TransferOut_Compensate")]publicasync Task<IActionResult> TransferOut_Compensate([FromBody] TransferRequest request){var msg = $"用戶{request.UserId}回滾轉(zhuǎn)出{request.Amount}元";_logger.LogInformation($"轉(zhuǎn)出補償子事務(wù)-啟動:{msg}");// 1. 創(chuàng)建子事務(wù)屏障var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);using (var conn = _context.Database.GetDbConnection()){// 在子事務(wù)屏障內(nèi)執(zhí)行事務(wù)操作await branchBarrier.Call(conn, async (tx) =>{_logger.LogInformation($"轉(zhuǎn)出補償子事務(wù)-執(zhí)行:{msg}");await _context.Database.UseTransactionAsync(tx);var bankAccount = await _context.BankAccount.FindAsync(request.UserId);if (bankAccount == null)return; //對于補償操作,可直接返回,中斷后續(xù)操作bankAccount.Balance += request.Amount;await _context.SaveChangesAsync();});}_logger.LogInformation($"轉(zhuǎn)出補償子事務(wù)-成功!");// 2. 因補償操作必須成功,所以必須返回200。return Ok();}

由于DTM設(shè)計為總是執(zhí)行補償,也就是說即使正向操作子事務(wù)失敗時,DTM 仍舊會執(zhí)行補償邏輯。但子事務(wù)屏障會在執(zhí)行時判斷正向操作的執(zhí)行狀態(tài),當子事務(wù)失敗時,并不會執(zhí)行補償邏輯。

另外DTM的補償操作,是要求最終成功的,只要還沒成功,就會不斷進行重試,直到成功。因此在補償子事務(wù)中,即使補償子事務(wù)中出現(xiàn)業(yè)務(wù)失敗時,也必須返回**200**因此當出現(xiàn)bankAccount==null時可以直接 return。

轉(zhuǎn)入子事務(wù)(TransferIn)


轉(zhuǎn)入子事務(wù)和轉(zhuǎn)出子事務(wù)的實現(xiàn)基本類似,都是開啟子事務(wù)屏障后,在branchBarrier.Call(conn, async tx => {}中實現(xiàn)事務(wù)邏輯,并通過拋異常的方式并最終返回409狀態(tài)碼來顯式告知DTM 子事務(wù)執(zhí)行失敗。

    [HttpPost("TransferIn")]publicasync Task<IActionResult> TransferIn([FromBody] TransferRequest request){var msg = $"用戶{request.UserId}轉(zhuǎn)入{request.Amount}元";_logger.LogInformation($"轉(zhuǎn)入子事務(wù)-啟動:{msg}");var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);try{using (var conn = _context.Database.GetDbConnection()){await branchBarrier.Call(conn, async (tx) =>{_logger.LogInformation($"轉(zhuǎn)入子事務(wù)-執(zhí)行:{msg}");await _context.Database.UseTransactionAsync(tx);var bankAccount = await _context.BankAccount.FindAsync(request.UserId);if (bankAccount == null)thrownew InvalidDataException("賬戶不存在!");bankAccount.Balance += request.Amount;await _context.SaveChangesAsync();});}}catch (InvalidDataException ex){_logger.LogInformation($"轉(zhuǎn)入子事務(wù)-失敗:{ex.Message}");returnnew StatusCodeResult(StatusCodes.Status409Conflict);}_logger.LogInformation($"轉(zhuǎn)入子事務(wù)-成功:{msg}");return Ok();}

轉(zhuǎn)入補償子事務(wù)(TransferIn_Compensate)


轉(zhuǎn)入補償子事務(wù)和轉(zhuǎn)出補償子事務(wù)的實現(xiàn)也基本類似,都是開啟子事務(wù)屏障后,在branchBarrier.Call(conn, async tx => {}中實現(xiàn)事務(wù)邏輯,并最終返回200狀態(tài)碼來告知DTM 補償子事務(wù)執(zhí)行成功。

    [HttpPost("TransferIn_Compensate")]publicasync Task<IActionResult> TransferIn_Compensate([FromBody] TransferRequest request){var msg = "用戶{request.UserId}回滾轉(zhuǎn)入{request.Amount}元";_logger.LogInformation($"轉(zhuǎn)入補償子事務(wù)-啟動:{msg}");var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);using (var conn = _context.Database.GetDbConnection()){await branchBarrier.Call(conn, async (tx) =>{_logger.LogInformation($"轉(zhuǎn)入補償子事務(wù)-執(zhí)行:{msg}");await _context.Database.UseTransactionAsync(tx);var bankAccount = await _context.BankAccount.FindAsync(request.UserId);if (bankAccount == null) return;bankAccount.Balance -= request.Amount;await _context.SaveChangesAsync();});}_logger.LogInformation($"轉(zhuǎn)入補償子事務(wù)-成功!");return Ok();}

編排Saga事務(wù)


拆分完子事務(wù),最后就可以進行Saga事務(wù)編排了,其代碼如下所示:

    [HttpPost("Transfer")]publicasync Task<IActionResult> Transfer(int fromUserId, int toUserId, decimal amount,CancellationToken cancellationToken){try{_logger.LogInformation($"轉(zhuǎn)賬事務(wù)-啟動:用戶{fromUserId}轉(zhuǎn)賬{amount}元到用戶{toUserId}");//1. 生成全局事務(wù)IDvar gid = await _dtmClient.GenGid(cancellationToken);var bizUrl = _configuration.GetValue<string>("TransferBaseURL");//2. 創(chuàng)建Sagavar saga = _transFactory.NewSaga(gid);//3. 添加子事務(wù)saga.Add(bizUrl + "/TransferOut", bizUrl + "/TransferOut_Compensate",new TransferRequest(fromUserId, amount)).Add(bizUrl + "/TransferIn", bizUrl + "/TransferIn_Compensate",new TransferRequest(toUserId, amount)).EnableWaitResult(); // 4. 按需啟用是否等待事務(wù)執(zhí)行結(jié)果//5. 提交Saga事務(wù)await saga.Submit(cancellationToken);}catch (DtmException ex) // 6. 如果開啟了`EnableWaitResult()`,則可通過捕獲異常的方式,捕獲事務(wù)失敗的結(jié)果。{_logger.LogError($"轉(zhuǎn)賬事務(wù)-失敗:用戶{fromUserId}轉(zhuǎn)賬{amount}元到用戶{toUserId}失敗!");returnnew BadRequestObjectResult($"轉(zhuǎn)賬失敗:{ex.Message}");}_logger.LogError($"轉(zhuǎn)賬事務(wù)-完成:用戶{fromUserId}轉(zhuǎn)賬{amount}元到用戶{toUserId}成功!");return Ok($"轉(zhuǎn)賬事務(wù)-完成:用戶{fromUserId}轉(zhuǎn)賬{amount}元到用戶{toUserId}成功!");}

主要步驟如下:

  1. 生成全局事務(wù)Id:var gid =await _dtmClient.GenGid(cancellationToken);

  1. 創(chuàng)建Saga全局事務(wù):_transFactory.NewSaga(gid);

  1. 添加子事務(wù):saga.Add(string action, string compensate, object postData);包含正向和反向子事務(wù)。

  1. 如果依賴事務(wù)執(zhí)行結(jié)果,可通過EnableWaitResult()開啟事務(wù)結(jié)果等待。

  1. 提交Saga全局事務(wù):saga.Submit(cancellationToken);

  1. 若開啟了事務(wù)結(jié)果等待,可以通過try...catch..來捕獲DtmExcepiton異常來獲取事務(wù)執(zhí)行異常信息。

運行項目


既然DTM作為一個獨立的服務(wù)存在,其負責通過HTTP或gRPC協(xié)議發(fā)起子事務(wù)的調(diào)用,因此首先需要啟動一個DTM實例,又由于本項目依賴MySQL,因此我們采用Docker Compose的方式來啟動項目。在Visual Studio中通過右鍵項目->Add->Docker Support->Linux 即可添加Dockerfile如下所示:

FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base
WORKDIR /app
EXPOSE 80
EXPOSE 443FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build
WORKDIR /src
COPY ["DtmDemo.WebApi/DtmDemo.WebApi.csproj", "DtmDemo.WebApi/"]
RUN dotnet restore "DtmDemo.WebApi/DtmDemo.WebApi.csproj"
COPY . .
WORKDIR "/src/DtmDemo.WebApi"
RUN dotnet build "DtmDemo.WebApi.csproj" -c Release -o /app/buildFROM build AS publish
RUN dotnet publish "DtmDemo.WebApi.csproj" -c Release -o /app/publishFROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "DtmDemo.WebApi.dll"]

在Visual Studio中通過右鍵項目->Add Container Orchestrator Support->Docker Compose即可添加docker-compose.yml,由于整個項目依賴mysql和DTM,修改docker-compose.yml如下所示,其中定義了三個服務(wù):db,dtm和dtmdemo.webapi。

version:'3.4'services:db:image:'mysql:5.7'container_name:dtm-mysqlenvironment:MYSQL_ROOT_PASSWORD:123456# 指定MySQL初始密碼volumes:-./docker/mysql/scripts:/docker-entrypoint-initdb.d# 掛載用于初始化數(shù)據(jù)庫的腳本ports:-'3306:3306'dtm:depends_on: ["db"]image:'yedf/dtm:latest'container_name:dtm-svcenvironment:IS_DOCKER:'1'STORE_DRIVER:mysql# 指定使用MySQL持久化DTM事務(wù)數(shù)據(jù)STORE_HOST:db# 指定MySQL服務(wù)名,這里是dbSTORE_USER:rootSTORE_PASSWORD:'123456'STORE_PORT:3306STORE_DB:"dtm"# 指定DTM 數(shù)據(jù)庫名ports:-'36789:36789'# DTM HTTP 端口-'36790:36790'# DTM gRPC 端口dtmdemo.webapi:depends_on: ["dtm", "db"]image:${DOCKER_REGISTRY-}dtmdemowebapienvironment:ASPNETCORE_ENVIRONMENT:docker# 設(shè)定啟動環(huán)境為dockercontainer_name:dtm-webapi-demobuild:context:.dockerfile:DtmDemo.WebApi/Dockerfileports:-'31293:80'# 映射Demo:80端口到本地31293端口-'31294:443'	 # 映射Demo:443端口到本地31294端口

其中dtmdemo.webapi服務(wù)通過ASPNETCORE_ENVIRONMENT: docker指定啟動環(huán)境為docker,因此需要在項目下添加appsettings.docker.json以配置應(yīng)用參數(shù):

{"ConnectionStrings": {"DtmDemoWebApiContext":"Server=db;port=3306;database=dtm_barrier;user id=root;password=123456;AllowLoadLocalInfile=true"},"TransferBaseURL":"http://dtmdemo.webapi/api/SagaDemo","dtm": {"DtmUrl":"http://dtm:36789","DtmTimeout":10000,"BranchTimeout":10000,"DBType":"mysql","BarrierTableName":"dtm_barrier.barrier"}
}

另外db服務(wù)中通過volumes: ["./docker/mysql/scripts:/docker-entrypoint-initdb.d"]來掛載初始化腳本,以創(chuàng)建DTM依賴的MySQL 存儲數(shù)據(jù)庫dtm和示例項目使用子事務(wù)屏障需要的barrier數(shù)據(jù)表。腳本如下:

CREATE DATABASE IF NOTEXISTS dtm
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
droptable IF EXISTS dtm.trans_global;
CREATETABLE if notEXISTS dtm.trans_global (`id` bigint(22) NOTNULL AUTO_INCREMENT,`gid` varchar(128) NOTNULL COMMENT 'global transaction id',`trans_type` varchar(45) notnull COMMENT 'transaction type: saga | xa | tcc | msg',`status` varchar(12) NOTNULL COMMENT 'tranaction status: prepared | submitted | aborting | finished | rollbacked',`query_prepared` varchar(1024) NOTNULL COMMENT 'url to check for msg|workflow',`protocol` varchar(45) notnull comment 'protocol: http | grpc | json-rpc',`create_time` datetime DEFAULTNULL,`update_time` datetime DEFAULTNULL,`finish_time` datetime DEFAULTNULL,`rollback_time` datetime DEFAULTNULL,`options` varchar(1024) DEFAULT'options for transaction like: TimeoutToFail, RequestTimeout',`custom_data` varchar(1024) DEFAULT'' COMMENT 'custom data for transaction',`next_cron_interval` int(11) defaultnull comment 'next cron interval. for use of cron job',`next_cron_time` datetime defaultnull comment 'next time to process this trans. for use of cron job',`owner` varchar(128) notnulldefault'' comment 'who is locking this trans',`ext_data` TEXT comment 'result for this trans. currently used in workflow pattern',`result` varchar(1024) DEFAULT'' COMMENT 'rollback reason for transaction',`rollback_reason` varchar(1024) DEFAULT'' COMMENT 'rollback reason for transaction',PRIMARY KEY (`id`),UNIQUE KEY `gid` (`gid`),key `owner`(`owner`),key `status_next_cron_time` (`status`, `next_cron_time`) comment 'cron job will use this index to query trans'
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
droptable IF EXISTS dtm.trans_branch_op;
CREATETABLE IF NOTEXISTS dtm.trans_branch_op (`id` bigint(22) NOTNULL AUTO_INCREMENT,`gid` varchar(128) NOTNULL COMMENT 'global transaction id',`url` varchar(1024) NOTNULL COMMENT 'the url of this op',`data` TEXT COMMENT 'request body, depreceated',`bin_data` BLOB COMMENT 'request body',`branch_id` VARCHAR(128) NOTNULL COMMENT 'transaction branch ID',`op` varchar(45) NOTNULL COMMENT 'transaction operation type like: action | compensate | try | confirm | cancel',`status` varchar(45) NOTNULL COMMENT 'transaction op status: prepared | succeed | failed',`finish_time` datetime DEFAULTNULL,`rollback_time` datetime DEFAULTNULL,`create_time` datetime DEFAULTNULL,`update_time` datetime DEFAULTNULL,PRIMARY KEY (`id`),UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `op`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
droptable IF EXISTS dtm.kv;
CREATETABLE IF NOTEXISTS dtm.kv (`id` bigint(22) NOTNULL AUTO_INCREMENT,`cat` varchar(45) NOTNULL COMMENT 'the category of this data',`k` varchar(128) NOTNULL,`v` TEXT,`version` bigint(22) default1 COMMENT 'version of the value',create_time datetime defaultNULL,update_time datetime DEFAULTNULL,PRIMARY KEY (`id`),UNIQUE key `uniq_k`(`cat`, `k`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
create database if notexists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
droptable if exists dtm_barrier.barrier;
createtable if notexists dtm_barrier.barrier(id bigint(22) PRIMARY KEY AUTO_INCREMENT,trans_type varchar(45) default'',gid varchar(128) default'',branch_id varchar(128) default'',op varchar(45) default'',barrier_id varchar(45) default'',reason varchar(45) default'' comment 'the branch type who insert this record',create_time datetime DEFAULT now(),update_time datetime DEFAULT now(),key(create_time),key(update_time),UNIQUE key(gid, branch_id, op, barrier_id)
);

準備完畢,即可通過點擊Visual Studio工具欄的Docker Compose的啟動按鈕,啟動后可以在Containers窗口看到啟動了dtm-mysql、dtm-svc和dtm-webapi-demo三個容器,并在瀏覽器中打開了 http://localhost:31293/swagger/index.html Swagger 網(wǎng)頁。該種方式啟動項目是支持斷點調(diào)試項目,如下圖所示:

通過BankAccouts控制器的POST接口,初始化用戶1和用戶2各100元。再通過SagaDemo控制器的/api/Transfer接口,進行Saga事務(wù)測試。

  1. 用戶1轉(zhuǎn)賬10元到用戶2

由于用戶1和用戶2已存在,且用戶1余額足夠, 因此該筆轉(zhuǎn)賬合法因此會成功,其執(zhí)行路徑為:轉(zhuǎn)出(成功)->轉(zhuǎn)入(成功)-> 事務(wù)完成,執(zhí)行日志如下圖所示:

  1. 用戶3轉(zhuǎn)賬10元到用戶1

由于用戶3不存在,因此執(zhí)行路徑為:轉(zhuǎn)出(失敗)->轉(zhuǎn)出補償(成功)->事務(wù)完成。從下圖的執(zhí)行日志可以看出,轉(zhuǎn)出子事務(wù)失敗,還是會調(diào)用對應(yīng)的轉(zhuǎn)出補償操作,但子事務(wù)屏障會過進行過濾,因此實際上并不會執(zhí)行真正的轉(zhuǎn)出補償邏輯,其中紅線框住的部分就是證明。

  1. 用戶1轉(zhuǎn)賬10元到用戶3

由于用戶3不存在,因此執(zhí)行路徑為:轉(zhuǎn)出(成功)->轉(zhuǎn)入(失敗)->轉(zhuǎn)入補償(成功)->轉(zhuǎn)出補償(成功)->事務(wù)完成。從下圖的執(zhí)行日志可以看出,轉(zhuǎn)入子事務(wù)失敗,還是會調(diào)用對應(yīng)的轉(zhuǎn)入補償操作,但子事務(wù)屏障會過進行過濾,因此實際上并不會執(zhí)行真正的轉(zhuǎn)入補償邏輯,其中紅線框住的部分就是證明。

子事務(wù)屏障


在以上的示例中,重復提及子事務(wù)屏障,那子事務(wù)屏障具體是什么,這里有必要重點說明下。以上面用戶1轉(zhuǎn)賬10元到用戶3為例,整個事務(wù)流轉(zhuǎn)過程中,即轉(zhuǎn)出(成功)->轉(zhuǎn)入(失敗)->轉(zhuǎn)入補償(成功)->轉(zhuǎn)出補償(成功)->事務(wù)完成。

在提交事務(wù)之后,首先是全局事務(wù)的落庫,主要由DTM 服務(wù)負責,主要包括兩張表:trans_global和trans_branch_op,DTM 依此進行子事務(wù)分支的協(xié)調(diào)。其中trans_global會插入一條全局事務(wù)記錄,用于記錄全局事務(wù)的狀態(tài)信息,如下圖1所示。trans_branch_op表為trans_global的子表,記錄四條子事務(wù)分支數(shù)據(jù),如下圖2所示:

具體的服務(wù)再接收到來自Dtm的子事務(wù)分支調(diào)用時,每次都會往子事務(wù)屏障表barrier中插入一條數(shù)據(jù),如下圖所示。業(yè)務(wù)服務(wù)就是依賴此表來完成子事務(wù)的控制。

而子事務(wù)屏障的核心就是子事務(wù)屏障表唯一鍵的設(shè)計,以gid、branch_id、op和barrier_id為唯一索引,利用唯一索引,“以改代查”來避免競態(tài)條件。在跨行轉(zhuǎn)賬的Saga示例中,子事務(wù)分支的執(zhí)行步驟如下所示:

  1. 開啟本地事務(wù)

  1. 對于當前操作op(action|compensate),使用inster ignore into barrier(trans_type, gid, branch_id, op, barrier_id, reason)向子事務(wù)屏障表插入一條數(shù)據(jù),有幾種情況:

  1. 插入成功且影響條數(shù)大于0,則繼續(xù)向下執(zhí)行。

  1. 插入成功但影響條數(shù)等于0,說明觸發(fā)唯一鍵約束,此時會進行空補償、懸掛和重復請求判斷,若是則直接返回,跳過后續(xù)子事務(wù)分支邏輯的執(zhí)行。

  1. 第2步插入成功,則可以繼續(xù)執(zhí)行子事務(wù)分支邏輯,執(zhí)行業(yè)務(wù)數(shù)據(jù)表操作,結(jié)果分兩種請求

  1. 子事務(wù)成功,子事務(wù)屏障表操作和業(yè)務(wù)數(shù)據(jù)表操作由于共享同一個本地事務(wù),提交本地事務(wù),因此可實現(xiàn)強一致性,當前子事務(wù)分支完成。

  1. 子事務(wù)失敗,回滾本地事務(wù)

每個子事務(wù)分支通過以上步驟,即可實現(xiàn)下圖的效果:

小結(jié)


本文主要介紹了DTM的Saga模式的應(yīng)用,基于DTM 首創(chuàng)的子事務(wù)屏障技術(shù),使得開發(fā)者基于DTM 提供的SDK能夠輕松開發(fā)出更可靠的分布式應(yīng)用,徹底將開發(fā)人員從網(wǎng)絡(luò)異常的處理中解放出來,再也不用擔心空補償、防懸掛、冪等等分布式問題。如果要進行分布式事務(wù)框架的選型,DTM 將是不二之選。

http://aloenet.com.cn/news/30002.html

相關(guān)文章:

  • 網(wǎng)站架設(shè)客服QQ百度熱搜 百度指數(shù)
  • 教育企業(yè)重慶網(wǎng)站建設(shè)動態(tài)網(wǎng)站設(shè)計畢業(yè)論文
  • 鄂爾多斯網(wǎng)站建設(shè)百度提交網(wǎng)站入口
  • 城鄉(xiāng)建設(shè)網(wǎng)站報建需要什么網(wǎng)絡(luò)產(chǎn)品及其推廣方法
  • 搭建網(wǎng)站找什么公司銷售網(wǎng)站排名
  • 網(wǎng)站建設(shè) 關(guān)于我們百度 seo 工具
  • 專業(yè)網(wǎng)站建設(shè)好不好越秀seo搜索引擎優(yōu)化
  • 滄州哪里可以做網(wǎng)站中山seo排名
  • 網(wǎng)站url改版百度推廣年費多少錢
  • 家庭寬帶 做網(wǎng)站在哪里查關(guān)鍵詞排名
  • 網(wǎng)站群建設(shè)報價seo優(yōu)化就業(yè)前景
  • 東營做網(wǎng)站建設(shè)的公司友鏈網(wǎng)站
  • it運維培訓怎么做優(yōu)化關(guān)鍵詞
  • c 新手一個人做網(wǎng)站湖北seo公司
  • 民權(quán)平臺網(wǎng)站建設(shè)銷售人員培訓課程有哪些
  • 關(guān)鍵詞優(yōu)化排名用哪個軟件比較好南京百度seo排名優(yōu)化
  • 網(wǎng)站建設(shè) 從用戶角度開始網(wǎng)站建設(shè)及推廣優(yōu)化
  • 網(wǎng)站優(yōu)化公司 網(wǎng)絡(luò)服務(wù)產(chǎn)品運營推廣方案
  • 成都都江堰網(wǎng)站建設(shè)廣告媒體資源平臺
  • 重慶網(wǎng)站服務(wù)器建設(shè)推薦萬網(wǎng)
  • 百競(湘潭)網(wǎng)站建設(shè)網(wǎng)站seo哪家做的好
  • 網(wǎng)站小圖標素材下載汕頭百度seo公司
  • 網(wǎng)站公司鄭州百度關(guān)鍵詞優(yōu)化公司
  • 上海平臺有限公司seo的公司排名
  • 世界做詭異的地方網(wǎng)站站長工具seo綜合查詢收費嗎
  • 網(wǎng)站通知系統(tǒng)推廣公司哪家好
  • 和平網(wǎng)站制作百度熱門
  • 新橋?qū)I(yè)網(wǎng)站建設(shè)關(guān)鍵詞排名seo
  • seo怎么做網(wǎng)站的tdk怎么在線上推廣自己的產(chǎn)品
  • 塑料機械怎么做網(wǎng)站百度的網(wǎng)站網(wǎng)址