asyncflow-06-项目应用
asyncflow-06-项目应用
学习核心
- 项目架构梳理、掌握代码核心模块设计
学习资料
- todo docker(docker安装配置、项目部署和性能压测)
项目工程目录
项目工程目录
- asyncflow-platform:asyncflow 核心
- flowsvr:web模块,负责和存储打交道(提供任务管理接口)
- worker:调度模块,负责占据任务、执行任务
- asyncflow_deal:服务治理模块
项目环境配置
- Springboot:
- MyBatis:
- Maven:
- MySQL
- Redis
项目启动
- 初始化数据库
- flowsvr:启动项目,调用接口创建任务
- worker:启动项目,模拟任务消费
不同业务接入
注意此处的项目定位是框架,而非提供异步处理的服务,它倾向提供一个框架模板,基于这个框架模板快速实现异步调用耗时服务的逻辑,只需关注异步调用业务逻辑的实现即可,对于任务管理、任务治理相关则由框架提供的接口完成。
如果希望业务完全隔离,则业务开发团队可以按照自己的需求单独部署自己的flowsvr、worker(多个flowsvr、多个worker),也可以基于统一管理的概念,构建一套通用的体系进行管理(使用同一个flowsvr,多个worker),不同的业务单独部署(这点有点类似于工作流引擎的整合概念,例如此前整合flowable框架,也是基于flowable整合出一套后台管理模块,其他业务接入则以子系统或者租户概念进行业务隔离,只不过通过构建统一的后台管理,本质上用的是同一套数据表)
项目模块介绍
首先理解每个项目模块的作用,如何启动?运作机制?实现细节?等,然后构建一个业务场景,掌握如何基于现有的框架接入一个新的业务类型并测试相应的任务生成、任务消费流程,理解并分析期间可能会遇到的异常问题,以及对于一些特殊流程处理的情况,任务治理模块是如何运作的
1.flowsvr
2.worker
3.async_deal(任务治理)
功能
分表-创建新表:处理当前写入表记录数量大于阈值时创建新表并向前滚动1
分表-向前滚动:处理已经没有可执行任务的表向前滚动
定时检查超时任务:处理超时任务,将任务更新为失败状态或者等待状态3
配置(application.yml配置)
# 项目自定义配置
deal:
timeout-tasks:
#超时任务最多一次处理1000个任务
limit-count: 1000
tasks :
#每个表的最大限制,不填默认为500w
table-limit: 50
# 数据库相关参数配置
spring:
datasource:
url: 你的数据库地址
username: 你的用户名
password: 你的密码
# 数据库地址需要添加a1lowMulti0ueries=true,主要是因为更新超时任务时是批量更新,会执行多条更新语句
项目改造测试
基础项目版本正常启动即可消费,完成相应的初始化配置即可,此处结合项目改造测试过程中可能遇到的问题和测试异常进行分析。
改造内容包括:项目结构调整(包结构)、bug修复、公共依赖拆分等
1.flowsvr 常见问题
接口调用异常排查思路:
- 接口调用排查:根据异常提示,确认请求参数、检查调用URL、确认响应参数
- 数据库操作排查:数据库操作通过try...catch...语句处理,需断点确认数据库操作
2.worker 启动调用常见问题
排查思路:
- worker 的部分实现是通过调用flowsvr 接口服务,因此要确认flowsvr的调用请求是否正常
- 理解业务的接入流程,拆解配置和代码实现中哪些部分息息相关,然后对照去分析
❓
❓java.lang.NoClassDefFoundError
反射时出现异常,通过“任务类型”参数构建,因此任务类型的配置要和业务代码的实现逻辑相对照,否则就会出现异常
Caused by: java.lang.NoClassDefFoundError: com/noob/worker/task/Lark (wrong name: com/noob/worker/task/lark)
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017)
at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:150)
❓"scheduleLog" is null 导致反射处理失败
Caused by: java.lang.NullPointerException: Cannot invoke "com.noob.worker.model.ScheduleLog.getLastData()" because "scheduleLog" is null
at com.noob.worker.core.observers.TimeObserver.getScheduleLog(TimeObserver.java:194)
at com.noob.worker.core.observers.TimeObserver.onError(TimeObserver.java:134)
... 10 more
todo:异常任务的处理:构建异常日志,将异常的任务执行丢到异常日志中,schedule_log字段存储的是核心的调度日志信息(因为大部分情况是正常执行的,出现异常的情况可以通过额外的异常日志信息来记录跟踪)
任务执行出错
task_stage 和 任务的执行相关,task_stage 阶段决定要调用的方法,如果执行异常则可检查配置信息(确认task_stage的流转是否正常)
java.lang.NullPointerException: Cannot invoke "java.lang.reflect.Method.getName()" because "method" is null
at com.noob.worker.boot.AppLaunch.executeTask(AppLaunch.java:146)
at com.noob.worker.boot.AppLaunch.lambda$execute$0(AppLaunch.java:108)
业务接入示例
todo:请问如果将这个框架提供给其他人在项目中使用,使用者要怎么用呢?或者面试官问我在项目中怎么使用这个框架的 现在我想的是:把flowsvr下载下来启动,然后在项目中引入worker模块,进行一些配置比如用户id、flowsvr地址等,然后定义一个任务类实现接口,实现里面的方法,然后在项目中需要执行异步任务的地方调用flowsvr的创建任务接口创建任务,然后等待worker拉取任务执行。
- 获得框架的代码和设计
- 进行配置(比如worker向flowsvr请求,得知道它的地址,flowsvr访问db和redis也得配置ip,密码之类的)
- 自定义worker,使用者需要在worker中定义 调度任务后,任务的执行逻辑,以及如何解析上下文的代码
- 部署中间件(MySQL,Redis)
- 启动flowsvr和worker程序
之后使用方提前调用创建任务接口,让flowsvr访问db,将数据先存起来。之后worker请求flowsvr,来获取任务调度执行
项目代码核心:
- worker 使用到了观察者:观察者用于观察worker执行到哪个阶段,不同的阶段做出相应的动作
- 分布式锁:锁的粒度(根据不同的任务类型,此处任务类型即分布式锁的粒度),如果一个worker要执行多个不同的任务类型的话,也可以自定义业务逻辑开启多个线程获取不同任务类型的任务进行处理,此处并不矛盾 (锁的粒度是不同任务类型?每次拉的是同一个任务类型的数据,那一个worker怎么处理多个不同的任务类型????并不冲突)
分表问题:
wrk 测试参考:https://ls8sck0zrg.feishu.cn/wiki/Q4GlwVdXBiFlmBkT8TocDg3EnWg
Where 1=1 是一个很危险的操作!!! 如果所有条件不生效则之间返回所有的数据,这个逻辑是错误的!!!