目录

记第一次跟踪seatunnel的任务运行过程三解析配置的具体方法getLogicalDag

记第一次跟踪seatunnel的任务运行过程三——解析配置的具体方法getLogicalDag

前绪

从这里开始,就是使用seatunnel-2.3.9的源码了。前面部分没有变化,2.3.X版本都是通用的。 建议打开源码,边读文章,边阅读源码

正文

getLogicalDag()方法还是在ClientJobExecutionEnvironment这个类中。

关键词DAG

DAG:有向无环图。 LogicalDag:在此可以理解为一个seatunnel job的运行结构图。管理的是从source到transform到sink的过程。

解析配置文件,生成资源对

ImmutablePair, Set> immutablePair = getJobConfigParser().parse(null); getJobConfigParser().parse()方法中解析在seatunnel执行名中使用‘–config’指定的配置文件,将其中的source、transformer、sink解析成一个个的anction,并且将每个action(即source、transform、sink)所需要用到的jar包地址提取出来。

收集全部的action,以备后用(后面还收集了全部的jar包资源)

actions.addAll(immutablePair.getLeft()); 使用actions这个对象,直接引用所有的action,方便后续的使用。例如:遍历所有的action进行某个动作处理。

读去配置,确定是否自动上传jar包

boolean enableUploadConnectorJarPackage = seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable(); 前面已经解析出来的所有的action和对应用到的jar包,这里就是根据配置是否将jar自动上传到服务器。 默认值是:false,即不自动上传。代表着需要提前将需要用到的jar包上传到seatunnel的lib文件夹下。 这里的配置就是从${SEATUNNEL_HOME}/config/seatunnel.yaml这个配置文件中解析出来了的。但是2.3.9版本的seatunnel.yaml中默认是没有 seatunnel.engine.jar-storage.enable 这一项的,所以使用的基本都是默认值,即:false。

seatunnnl.yaml配置文件完整版及解析

seatunnel.yaml文件的解析对象对应的是org.apache.seatunnel.engine.common.config.server.ServerConfigOptions 这个类。 seatunnel.yaml中配置不全且没有明确的说明,可以到这个文件中查找。

处理jar包

配置seatunnel.engine.jar-storage.enable=true,上传jar包

if (enableUploadConnectorJarPackage) { Set commonJarIdentifiers = connectorPackageClient.uploadCommonPluginJars(Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars); Set commonPluginJarUrls = getJarUrlsFromIdentifiers(commonJarIdentifiers); Set pluginJarIdentifiers = new HashSet<>(); uploadActionPluginJar(actions, pluginJarIdentifiers); Set connectorPluginJarUrls = getJarUrlsFromIdentifiers(pluginJarIdentifiers); connectorJarIdentifiers.addAll(commonJarIdentifiers); connectorJarIdentifiers.addAll(pluginJarIdentifiers); jarUrls.addAll(commonPluginJarUrls); jarUrls.addAll(connectorPluginJarUrls); actions.forEach( action -> { addCommonPluginJarsToAction( action, commonPluginJarUrls, commonJarIdentifiers); }); } 如果要上传jar包,则将公共插件的jar包、前面解析出来的action使用到的jar包上传上去。 收集所有的jar包,并且给每个action添加公共插件jar包。

配置seatunnel.engine.jar-storage.enable=false(默认),不上传jar包

jarUrls.addAll(commonPluginJars); jarUrls.addAll(immutablePair.getRight()); actions.forEach( action -> { addCommonPluginJarsToAction( action, new HashSet<>(commonPluginJars), Collections.emptySet()); }); 收集所有的jar包,并且给每个action添加公共插件jar包。

结束:生成logicDag

getLogicalDagGenerator().generate() 生成一个logicDag,并返回。

后续