用python如何实现saga分布式事务?
Admin 2021-09-15 群英技术资讯 693 次浏览
这篇文章主要给大家分享saga分布式事务的内容,下文分享了saga的理论知识和用python实现saga分布式事务的实例,对大家理解saga分布式事务有一定的帮助,感兴趣的朋友可以参考一下,希望大家阅读完这篇文章能有所收获,下面我们一起来学习一下吧。
银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID,只能够通过分布式事务来解决。
分布式事务在分布式环境下,为了满足可用性、性能与降级服务的需要,降低一致性与隔离性的要求,一方面遵循 BASE 理论:
Saga 是这一篇数据库论文SAGAS提到的一个分布式事务方案。其核心思想是将长事务拆分为多个本地短事务,由 Saga 事务协调器协调,如果各个本地事务成功完成那就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。
目前可用于 SAGA 的开源框架,主要为 Java 语言,其中以 seata 为代表。我们的例子采用 go 语言,使用的分布式事务框架为https://github.com/yedf/dtm,它对分布式事务的支持非常优雅。下面来详细讲解 SAGA 的组成:
DTM 事务框架里,有 3 个角色,与经典的 XA 分布式事务一样:
下面看一个成功完成的 SAGA 时序图,就很容易理解 SAGA 分布式事务:
对于我们要进行的银行转账的例子,我们将在正向操作中,进行转入转出,在补偿操作中,做相反的调整。
首先我们创建账户余额表:
CREATE TABLE dtm_busi.`user_account` ( `id` int(11) AUTO_INCREMENT PRIMARY KEY, `user_id` int(11) not NULL UNIQUE , `balance` decimal(10,2) NOT NULL DEFAULT '0.00', `create_time` datetime DEFAULT now(), `update_time` datetime DEFAULT now() );
我们先编写核心业务代码,调整用户的账户余额
def saga_adjust_balance(cursor, uid, amount): affected = utils.sqlexec(cursor, "update dtm_busi.user_account set balance=balance+%d where user_id=%d
and balance >= -%d" %(amount, uid, amount)) if affected == 0: raise Exception("update error, balance not enough")
下面我们来编写具体的正向操作 /补偿操作的处理函数
@app.post("/api/TransOutSaga") def trans_out_saga(): saga_adjust_balance(c, out_uid, -30) return {"dtm_result": "SUCCESS"} @app.post("/api/TransOutCompensate") def trans_out_compensate(): saga_adjust_balance(c, out_uid, 30) return {"dtm_result": "SUCCESS"} @app.post("/api/TransInSaga") def trans_in_saga(): saga_adjust_balance(c, in_uid, 30) return {"dtm_result": "SUCCESS"} @app.post("/api/TransInCompensate") def trans_in_compensate(): saga_adjust_balance(c, in_uid, -30) return {"dtm_result": "SUCCESS"}
到此各个子事务的处理函数已经 OK 了,然后是开启 SAGA 事务,进行分支调用
# 这是 dtm 服务地址 dtm = "http://localhost:8080/api/dtmsvr" # 这是业务微服务地址 svc = "http://localhost:5000/api" req = {"amount": 30} s = saga.Saga(dtm, utils.gen_gid(dtm)) s.add(req, svc + "/TransOutSaga", svc + "/TransOutCompensate") s.add(req, svc + "/TransInSaga", svc + "/TransInCompensate") s.submit()
至此,一个完整的 SAGA 分布式事务编写完成。
如果您想要完整运行一个成功的示例,那么参考这个例子yedf/dtmcli-py-sample,将它运行起来非常简单
# 部署启动 dtm # 需要 docker 版本 18 以上 git clone https://github.com/yedf/dtm cd dtm docker-compose up # 另起一个命令行 git clone https://github.com/yedf/dtmcli-py-sample cd dtmcli-py-sample pip3 install flask dtmcli requests flask run # 另起一个命令行 curl localhost:5000/api/fireSaga
假设提交给 dtm 的事务中,调用转入操作时,出现短暂的故障怎么办?按照 SAGA 事务的协议,dtm 会重试未完成的操作,这时我们要如何处理?故障有可能是转入操作完成后出网络故障,也有可能是转入操作完成中出现机器宕机。如何处理才能够保障账户余额的调整是正确无问题的?
这类网络异常的妥当处理,是分布式事务中的大难题,异常情况包括三类:重复请求、空补偿、悬挂,都需要正确处理
DTM 提供了子事务屏障功能,保证上述异常情况下的业务逻辑,只会有一次正确顺序下的成功提交。(子事务屏障详情参考分布式事务最经典的七种解决方案的子事务屏障环节)
我们把处理函数调整为:
@app.post("/api/TransOutSaga") def trans_out_saga(): with barrier.AutoCursor(conn_new()) as cursor: def busi_callback(c): saga_adjust_balance(c, out_uid, -30) barrier_from_req(request).call(cursor, busi_callback) return {"dtm_result": "SUCCESS"}
这里的 barrier_from_req(request).call(cursor, busi_callback)调用会使用子事务屏障技术,保证 busi_callback 回调函数仅被提交一次
您可以尝试多次调用这个 TransIn 服务,仅有一次余额调整。
假如银行将金额准备转入用户 2 时,发现用户 2 的账户异常,返回失败,会怎么样?我们调整处理函数,让转入操作返回失败
@app.post("/api/TransInSaga") def trans_in_saga(): return {"dtm_result": "FAILURE"}
我们给出事务失败交互的时序图
这里有一点,TransIn 的正向操作什么都没有做,就返回了失败,此时调用 TransIn 的补偿操作,会不会导致反向调整出错了呢?
不用担心,前面的子事务屏障技术,能够保证 TransIn 的错误如果发生在提交之前,则补偿为空操作;TransIn 的错误如果发生在提交之后,则补偿操作会将数据提交一次。
您可以将返回错误的 TransIn 改成:
@app.post("/api/TransInSaga") def trans_in_saga(): with barrier.AutoCursor(conn_new()) as cursor: def busi_callback(c): saga_adjust_balance(c, in_uid, 30) barrier_from_req(request).call(cursor, busi_callback) return {"dtm_result": "FAILURE"}
最后的结果余额依旧会是对的,原理可以参考:分布式事务最经典的七种解决方案的子事务屏障环节
在这篇文章里,我们介绍了 SAGA 的理论知识,也通过一个例子,完整给出了编写一个 SAGA 事务的过程,涵盖了正常成功完成,异常情况,以及成功回滚的情况。相信读者通过这边文章,对 SAGA 已经有了深入的理解。
文中使用的 dtm 是新开源的 Golang 分布式事务管理框架,功能强大,支持 TCC 、SAGA 、XA 、事务消息等事务模式,支持 Go 、python 、PHP 、node 、csharp 等语言的。同时提供了非常简单易用的接口。
以上就是关于用python如何实现saga分布式事务的内容,上文有具体的实例及详细的介绍,有需要的朋友可以参考学习,想要了解更多saga分布式事务的内容,大家可以继续浏览群英网络其他相关的文章。
文本转载自脚本之家
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。
猜你喜欢
第一种,利用functools工具,调用reduce()函数求阶乘。||第二种,使用for循环来求阶乘。||第三种,直接定义一个阶乘函数func(),利用递归的方式来求阶乘即可。
这篇文章主要介绍了基于python的matplotlib制作双Y轴图,文中有非常详细的代码示例,对正在学习python的小伙伴们有很好地帮助,需要的朋友可以参考下
python下如何往数据库批量插入数据?方法是什么?假如我们想要往数据库表中的插入的数据有几百上千条,那么一条条插入,则调用sql语句查询插入就需要执行几百上千,这样花费的时间就非常的长。因此我们可以使用cursor.executemany(sql,args)来实现批量插入数据,那么具体怎样做呢?接着往下看。
pycharm中jupyter怎样安装使用?Jupyter也就是指Jupyter Notebook,是一个交互式笔记本,支持运行40多种编程语言,能够应用于数据清理和转换,数值模拟,统计建模,机器学习等。这篇文章主要分享在pycharm中,jupyter的使用。
python操作excel主要用到xlrd和xlwt两个库,xlrd读取表格数据,支持xlsx和xls格式的excel表格;xlwt写入excel表格数据
成为群英会员,开启智能安全云计算之旅
立即注册Copyright © QY Network Company Ltd. All Rights Reserved. 2003-2020 群英 版权所有
增值电信经营许可证 : B1.B2-20140078 粤ICP备09006778号 域名注册商资质 粤 D3.1-20240008