asyncflow-01-项目构建
asyncflow-01-项目构建
学习核心
- 理解同步VS异步场景
- 理解asyncflow项目立意(异步、任务、框架),聚焦框架核心
- 针对异步任务场景
- 支持任务调度功能复用,提供任务调度和任务管理功能,以减少开发维护成本、提升效率
- 对比现有框架,理解为什么要手动造轮子(例如对比消息队列、工作流引擎等,结合框架聚焦核心对比分析:异步、轻量、任务调度和管理)
- 接入流程(搭建环境、跑通项目)
- 基于JAVA版本的环境配置,跑通项目
学习资料
项目背景&理解需求
1.同步&异步
同步 & 异步 概念核心
- 同步:发起请求,等待服务端执行完任务回包(响应)
- 异步:发起者是用接口发起任务 =》任务异步处理 =》发起者轮询结果
同步 & 异步 场景案例
例如一个接口的执行需要100分钟:
- 同步:发起请求,等待服务端执行完任务回包,需要100分钟
- 异步:发起任务=》异步执行=》轮询结果
- 发起者使用接口发起任务,随后接收到任务凭证(一般是任务ID),后续操作交由服务端处理
- 任务在服务端异步处理,处理需要100分钟
- 任务执行结果响应
- 方式1:轮询方式(可靠性兜底) =》发起者定时轮询结果(通过任务凭证调用检索接口),确认任务是否完成
- 方式2:回调方式(更加及时)=》服务端在任务执行完成之后进行回调(存在两个限制:调用方需提供接受回调的地址、需考虑回调失败的场景)
对比此场景分析,同步是直接调用接口并等待任务执行完成,而异步则是将接口进行拆分,先调用执行接口记录下要执行的任务(至于服务端什么时候执行则结合场景进行设定),随后每隔一段时间调用检索接口查询任务状态,具体的任务处理交由服务端后台去处理。
例如生活中的案例场景:去派出所办理身份证的正常流程所需时间为3天
- 同步则是办理身份证手续完成后一直在柜台干等3天等待响应结果
- 异步则是待手续完成后可以直接回家做其他事情,随后3天内可以不定时咨询下办理结果
同步和异步的核心区别在于接口调用后处理方是否真的执行任务:
- 同步是调用接口真正执行任务,等待响应任务执行结果
- 异步则是调用接口告知要执行任务,服务端响应任务执行的"登记"结果(具体的任务执行由服务端后台决定)
同步的局限在于耗时操作阻塞进程,在任务执行期间进程无法做任何事情,如果任务执行很长一段时间之后却被告知失败的话,这个过程是很心塞的。而引入异步的优势在于,在具备任务执行条件时发起执行任务的请求,此时客户端不需要关注服务端后台的执行情况,因为这是一个耗时的过程,只需要在后续的时间周期内轮询结果即可,真正的任务执行由服务端决定。
异步适用场景
结合上述场景分析可知,异步引入的场景是针对耗时操作的一个场景优化,所谓耗时操作即任务执行需要很长的一段时间。
常见的基础业务场景案例有:视频转码、区块链处理、AI分析、审核操作、深度学习等,这也是asyncflow项目面向的场景,这种底层处理一般对上层体现为暴露两个接口:发起任务接口、查询任务接口,而asyncflow项目框架则是可以串联多个这种异步任务来达到解决异步场景问题的目的。
因此此处也可以理解为asyncflow项目是针对处理异步任务而生,它的核心责任是调度,而不是具体做底层的事情(底层的事情可以理解为使用封装好的内容或借助第三方的组件、框架,这部分是透明的)
【视频转码】场景:阶段1-审核、阶段2-获取元信息、阶段3-转码
【深度学习】场景:阶段1-数据采集、阶段2-数据清洗、验证、阶段3-数据增强
2.asyncflow构建场景(项目立意)
项目立意
(1)为什么要做asyncflow框架?
可以结合三个方面去扩展说明:异步、任务、框架
- 异步:针对异步场景衍生的框架:针对要异步的操作或者第三方化的接口调用相关
- 任务:这个框架提供任务调度、任务管理的功能,以简化开发流程、降低维护成本,提供异步能力复用和任务管理的支持
- 框架:框架诞生的原因,一般可能会说有一个业务场景开发需要用到异步调用相关,所以写了个框架,但实际上这是一个脱离实际的思路(比较离谱、站不住脚)。任何框架的抽离都不可脱离业务开发实际场景,实际上这个框架的衍生应该是要结合业务去说明,例如在写业务A的时候编写了异步调用相关代码逻辑,然后后续在写业务B的时候发现也用到类似的思路,而针对这块的代码可以考虑复用逻辑,于是思考是否可以将异步调用这块内容单独抽离出来整合成一个框架(或者是通用的内容)
项目立意
针对项目立意,要说明清楚为什么要做?有没有现成可用的内容?做的话怎么做?做出来的结果是怎样的?,针对框架实现可以结合下面几点进行扩展:发现问题=》突发灵感=》市场调研=》确定方案=》解决问题=》得到验证
- 发现问题:针对业务开发场景,用到一些异步调用相关的内容,重复编写类似的逻辑代码
- 突发灵感:针对这类重复的操作,是否可以考虑脱离业务逻辑,将核心功能抽离出来
- 市场调研:例如秒杀场景使用消息队列、工作流引擎的引入等,对比说明这些技术栈的实践应用和场景适配性,引出为什么要自定义这个框架
- 确定方案:结合现有场景,确定技术方案和架构设计
- 解决问题:基于现有场景优化框架设计,测试框架是否可用
- 得到验证:真正将框架与实际业务进行结合,确认是否起到作用(最基础的就是是否真正减轻业务代码开发工作、能否有效提升开发效率)
切记不要说是因为学习驱动而去做这件事,要学会去除学生思维,核心方向应该是主动发现工作问题、创造工作价值
挂靠点
框架构建完成,应该要有实际应用场景支持,即如果将这个框架实际和自身项目融合使用,因此要择选一个挂靠点(例如可以和实际项目进行结合,例如实验研究项目、公司项目、GitHub项目等)
- 实验室:例如实验研究项目(AI相关等)
- 公司:将框架与公司实际项目需求融合,阐述框架应用
- Github(一般不太建议):开源网站项目,做开源是一个比较难把握的度,如果private则难以验证真伪,如果public又很难体现效能
现有开源方案参考
- Golang开源方案调研
- Java 开源方案调研
- XXX-JOB:比较复杂,还引入了其他的内容(例如定时任务等)
- Activiti、flowable:BPMN工作流概念,更倾向任务流转管理的场景应用
Asyncflow VS 其他开源异步任务框架
异步任务框架搭建-开源方案对比:结合自身优势进行说明
- 场景:专注异步任务
- 轻量:极其轻量
- 特性:重试多策略、秒级优先级、支持上下文更新
要点 | Celery | Machinery | Activiti/flowable | XXX-JOB | Asyncflow |
---|---|---|---|---|---|
接入成本 | 低 | 低 | 高 | 中 | 低 |
复杂度 | 低 | 低 | 高 | 中 | 低 |
重试策略 | 支持间隔重试 | 支持间隔重试 | 支持间隔重试 | 支持间隔重试 | 支持灵活的重试机制 |
优先级 | 常规优先级 | 常规优先级 | 常规优先级 | 常规优先级 | 支持秒级优先级 |
多阶段 | 多任务串联 | 多任务串联 | 多任务串联 | 多任务串联 | 单任务多阶段 |
场景 | 异步任务 | 异步任务 | BPMN/工作流 | 异步任务 定时任务 | 异步任务 |
Asyncflow 项目架构:
- 业务开发的抽象
- 架构轻量
- 功能垂直,聚焦异步场景
(2)核心理解
需求概述(项目立意):针对一些耗时的多阶段异步任务调度场景
生产环境中,对外能力除了以同步接口的方式提供,很多业务还会涉及异步流程,比如音视频处理、区块链、审核等耗时操作。异步流程说明如下:
- 【1】发起者向同步接口发起任务
- 【2】任务异步处理
- 【3】发起者轮询结果
asyncflow相关构建的立意则是针对这类业务(耗时操作),提供通用的任务异步处理框架。
传统从0-1开发一个异步业务,需要考虑任务管理、异常处理、流量控制等细节,而使用通用的asyncflow框架可以有效缩短开发时间周期,提升开发效率(通常开发需2周,而引入框架1天就能搞定)
此处的异步场景需区分于Redis学习篇章中秒杀海量的异步场景,秒杀场景中的异步是针对秒杀场景的一种性能优化,而此处的异步场景则是真正针对耗时操作的异步执行,需要和秒杀场景中的”消息队列“的应用做区分。此处asyncflow项目的适用场景分析如下:
- 整体异步
- 分多阶段、阶段之间有依赖
- 建议每个阶段最好也是异步,同步则放在不同阶段
例如一个音视频处理任务,一般需审核、等待审核完成之后才可以进行下一步操作(例如获取视频元信息等)
可以理解为,处理一个任务,这个任务可以拆分为多个阶段,阶段和阶段之间是同步进行、且可以存在依赖,通过引入asyncflow框架不仅可以复用异步调用机制,还可将这些相互依赖的不同阶段以任务管理的形式管理起来,便于客户端跟踪任务执行状态
思考:为什么不直接用消息队列?
asyncflow框架的核心是异步能力复用、节约开发成本,此处不使用消息队列的核心原因是场景支持的问题,结合以下2点理解:
- 要点1:支持上下文更新操作。异步场景可能并不止一个操作,比如一个视频处理任务,阶段1为查询视频元信息、阶段2为鉴黄、阶段3为是转码,需要记录每一阶段的结果,而消息队列通常是负责消息的流转,不支持任务中间数据的更新(但可通过设计消息不断流转的模型来引用消息队列达到上下文更新目的)
- 要点2:支持任务管理。比如某个用户下进行中的任务列表等常见的查询方式,需要提供关系化管理的能力,此处由asyncflow统一提供则不需要开发者重复造轮子,而消息队列无法直接提供任务查询的功能
针对此场景概念的理解不应太过于钻牛角尖,虽然在某些场景下消息队列可以以”曲线救国“的形式来达成目的,但实际上要理解适配性原则,应因地制宜。此处结合这两个要点来理解消息队列和ansyncflow的对比:即理解有些事情谁可以做,但是谁更适合去做
消息队列(kafka、RabbitMQ、RocketMQ)的本质是传递消息;而asyncflow框架的本质是任务调度和任务管理(可查询、可更新)
支持上下文更新操作
- 消息队列:通常用作负责消息的流转,可以采用消息不断流转的模型来达到上下文更新目的,例如可以将每个阶段对应一个消息队列,如果阶段之间存在依赖,则在上一阶段处理后将下一阶段的前置条件存入数据库(用作消息丢失补偿),然后将这个消息放到下一阶段的消息队列中,在这个任务执行过程中产生多个消息,通过消息不断流转的方式达成上下文更新目的。
- asyncflow:整个任务执行过程中始终只有一个任务,每个任务可能关联不同阶段的数据信息,阶段数据则通过任务关联检索即可
支持任务管理
- 消息队列:倾向组件应用,虽然可以借助一些第三方框架查看消息队列信息,但显示并不直观(无法适配业务设定,需额外扩展)
- 例如发送消息一般是发送关键的核心消息用作消费,如果说每次发送消息都发送”全量数据“以提供查询支撑,不仅在性能上有所消耗,还需提供额外的开发成本和维护支持
- asyncflow:倾向异步任务框架,提供任务管理(可灵活检索任务关联信息)
- 消息队列:倾向组件应用,虽然可以借助一些第三方框架查看消息队列信息,但显示并不直观(无法适配业务设定,需额外扩展)
因此,结合整体异步场景而言,引入asyncflow框架的核心是为了更好地简化开发流程和维护成本,提供异步能力复用和任务管理的支持
思考:为什么不直接用工作流?
类似Activiti、flowable这类工作流框架,更倾向企业模型构建,偏向于企业管理的工作流,它的接入成本和复杂度都比较高,于普通的业务场景开发而言工作流引擎的引入太重了,有一定的开发和维护成本
场景案例
(1)音视频场景
音视频流程说明(此处简化为2阶段处理进行案例分析)
- 阶段1:审核(鉴黄等)
- 阶段2:转码
此处视频处理的核心是理解如何处理视频,其具体实现可以借助第三方平台进行接口调用完成操作:
针对第三方平台接口调用,其核心是场景的HTTP调用方式,通过传递参数调用接口,随后获取到响应的taskId等信息
上下文存储概念
所谓的上下文信息,可以理解为每个阶段流转需要使用的一些中间数据信息,这些数据可以结合场景自定义对象,将JSON序列化之后存入MySQL数据库中,需要用到的时候再反序列化回来。
审批概念
针对审批,可以理解为框架之外单独一个微服务,这个微服务可以创建一条一条表单,每个表单有个审批状态框架只负责调度,当成和音视频场景一样,审批服务就是个第三方服务,多级审批就是多阶段任务,每个阶段创建条表单
具体来说,每个阶段,就向审批微服务发起一个请求,审批微服务产生一条表单,并返回一个审批表单任务ID,框架拿到这个ID,就可以通过ID查询结果。 实际场景中很多时候审批系统,本身就是一条表单,可以流转状态,可能并不需要额外引入微服务设计。此处为了能融入Asyncflow,把审批服务设计成只能生成一条一条表单的微服务,这其实也是基于目的来设计系统,这种设计说不上离谱,但确实容易被挑战。如果被挑战,可以解释为希望审批服务比较职责单一,多阶段任务应该由更上层(也就是框架)来进行组合
worker 的工作流程
可以理解为worker的核心工作是拉取任务、执行任务,当一个任务来到worker,随后就会进入process处理函数(这个函数是任务处理的一个聚合函数概念,它会根据任务的 task_stage 字段判断当前任务所处阶段,然后通过switch case 执行相应的内容,这些都是由使用asyncflow框架自定义实现的业务逻辑)
如果要结合实际业务场景,可能代码会有点难看(可以引入相关设计模式进行代码结构和实现优化),此处主要是为了理清worker的调度流程
(2)业务场景融入
什么场景能融入呢?
其实只要满足做的事情是异步的即可,比如AI计算、图像处理、大规模数据导入、文件下载等一次处理的耗时较长的一些通用服务
怎么融入呢?
抽离核心业务,独立成一个单独的服务(目的为了单一职责),切忌不要丢一堆具体业务细节,容易造成沟通困难(简述业务场景和服务核心即可)。抽象点来说就是有个具体业务的单独服务(类似上述腾讯云提供的服务接口概念),asyncflow框架通过调用这个服务的接口实现异步,这个微服务需提供创建任务和查询任务的接口。
项目核心
(1)功能列表
- 支持任务创建(同步返回任务ID);
- 支持通过任务ID查询;
- 任务自动调度,维护任务状态;
- 支持各种任务快速注册;
- 任务支持优先级调度;
- 服务挂掉恢复后可以自动续做;
- 灵活的任务配置;
- 任务使用MySQL之类DB来存储;
(2)性能指标
2核4G的虚拟机下,创建任务接口、占据任务接口、查询任务结果接口能达到2000QPS(不同环境测出的数据结果可能有所不同,此处是一个参考的量级,可结合实践进行验证),并可以水平扩展(1台机器2000QPS,2台机器可达4000QPS,理想状态下总qps=服务器数量*单台服务器QPS数)
(3)核心问题
- 存储:MySQL
- 多机竞争问题:多个Worker如何竞争
- 数据结构:任务的结构设计是怎样的?
- 任务状态机:任务状态如何流转
- 异常处理机制:如果服务挂掉、任务卡死(任务长时间处于执行中)会如何处理
3.接入流程(JAVA版本)
首先理解项目怎么玩,然后再进一步掌握项目代码实现细节
项目启动
项目说明
- asyncflow
- flowsvr 模块:提供任务管理相关接口
- worker 模块:“消费者” =》拉取任务并执行,由开发团队自定义实现业务逻辑完成任务调度
- async_deal 项目(服务治理模块/任务治理模块):用于处理一些特殊情况
初始化SQL
- 初始化SQL内容,创建数据表
项目环境配置、编译、启动(本地部署)
版本:asyncflow-java-0926、asyncflow_deal(此处针对现有项目版本说明,后续优化版本参考【06-项目应用】中的一些扩展说明)
JAVA版本:先启动flowsvr项目,后启动worker项目
项目环境准备:MySQL-5.7.44、Redis、Maven等JAVA环境
asyncflow 工程包括flowsvr、worker两个模块,配置maven环境
flowsvr 项目
- flowsvr 项目配置:
resources/application.yml
配置,修改数据库配置 - flowsvr 项目启动:先启动
FlowsvrApplication
- flowsvr 项目配置:
worker 项目
- worker 项目启动:后启动
WorkerApplication
(worker定时拉取任务并执行,需要调用flwosvr服务接口)
- worker 项目启动:后启动
asyncflow_deal:任务治理模块,用于处理一些特殊情况
业务流程测试
worker 细节
worker的实现逻辑:在版本实现中worker通过"反射"实现任务不同阶段的调度逻辑,Java代码实现和数据库配置主要做了如下绑定:
- 将数据库表task的taskType配置与JAVA代码中的类进行绑定:例如taskType为Lark,则其关联的任务为packageName.Lark
- 将数据库表task的taskStage配置与对应任务类的阶段执行方法进行绑定:例如当taskStage中“executeXXX”,则该阶段相应执行Lark类中的executeXXX方法
因此在构建测试数据的时候,需确保任务类型在各个表中保持一致(一般情况下确定好任务类型),worker中通过反射实现多阶段任务的调度逻辑,因此任务类型的设定和类的定义要一致(或者配置相应的映射,不受限于设定,通过配置化映射的方式将任务类型和要执行的方法关联起来),其次对应任务的执行阶段taskStage与对应任务类的执行函数的定义一致(通过指定的taskStage定位到要执行的方法名,随后通过上下文调用相应的方法)
测试流程说明
【1】初始化数据表,并插入一条任务类型taskType为‘Lark’的任务调度配置和任务调度位置信息
【2】启动flowsvr,确保接口服务正常提供支持
【3】创建任务:执行worker中的测试方法testCreateTask()
创建一个任务
【4】执行任务:启动worker程序,拉取任务并执行,观察执行日志和数据库记录的变化情况
为了更好地观察调度过程和测试结果,可以先创建一个任务,任务每执行完一个阶段就观察任务记录的变化情况,理解任务调度执行的原理和过程
此处对于一个任务由始至终只有一条信息记录,任务的多阶段体现在taskStage和status的组合。不同于一些流程引擎,他们可能会将任务拆分为多节点,每个阶段的执行都对应一条记录,便于后续业务跟踪,其主要切入点是关注任务的流转过程(常用于一些流程管理类的信息系统)
asyncflow 思维导图(自定义思维导图)
如果要在程序里面拉取多个不同任务是不是可以这样构造?或者还可以有别的方法?
相当于并行流加载处理不同任务,最后再把结果合并?其实现就是为了并行HTTP获取任务,这里相当于有俩线程池,并行流用的是ForkJoinPool,execute方法里又调用了我们自己的线程池???用的parallelStream来实现并行的?
todo:利用了并行流(parallelStream())来并行处理scheduleCfgDic.values()集合中的元素,但这并不意味着它会显式地创建一个新的线程池。并行流内部会根据系统配置和当前环境选择合适的线程数来执行任务。 具体来说: scheduleCfgDic.values().parallelStream():这一步将scheduleCfgDic字典的所有值转换成一个并行流。 .forEach(config -> {execute(config.getTask_type());}):然后,对于流中的每一个config对象,它会执行execute(config.getTask_type())方法。由于这是在一个并行流中,这些方法调用可能会在多个线程上同时进行,具体取决于系统的并行度设置。 因此,虽然它确实可能利用多线程来加速处理,但具体的线程管理是由Java的并行流实现自动处理的,而不是代码中明确创建的线程池。
思考下会不会存在什么其他问题??
java 如何实现定时拉取:for死循环,然后里面会去获取一个间隔时间,然后拉取完一次,Thread.sleep间隔时间
或者使用定时器??
解耦;职责分明,业务接入只需关注worker实现逻辑、部署worker。