记第一次跟踪seatunnel的任务运行过程四getJobConfigParser.parse的动作
记第一次跟踪seatunnel的任务运行过程四——getJobConfigParser().parse()的动作
前绪
正文
书接上文
ImmutablePair, Set> immutablePair = getJobConfigParser().parse(null); 在前一篇文章中说到getLogicDag()方法的第一行(如上),执行了一个解析方法获得了action和jar包资源对,这一步就是由parse()方法来执行的。本片文章就是对parse()方法的源码探索。
任务配置文件
先回顾一下执行seatunnel.sh时命令中‘ –config ’指定的任务配置文件的结构: env{} source{} transform{} sink{} 主要有以上四部分
获得任务配置解析器
getJobConfigParser()方法也org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment中的一个受保护的方法,在这个方法中直接返回了一个MultipleTableJobConfigParser(多表任务配置解析器)对象。这也与Seatunnel官网宣传的支持耽搁文件中同时同步多张表的数据相一致。 所以对parse()方法的调用,实际就是由MultipleTableJobConfigParser的parse方法的承担执行的。 在getJobConfigParser()方法方法中,有对‘从保存的检查点开始任务’的支持。但是从执行的代码路径上来看,创建ClientJobExecutionEnvironment对象的时候,其中isStartWithSavePoint参数给的是false,所以并没有启用。 确实我们的探索是从一个全新的任务开始并不是从一个save point开始。
填充jobClient对象(解析env)
this.fillJobConfigAndCommonJars(); 获取‘ –config ’指定的配置文件中的env部分的内容:
- 提取任务模式(jobModel),是stream流式的任务,还是batch批任务;
- 提取任务命名;
- 提取其他env部分的配置;
- 提取env部分指定的第三方jar包的uri,并且封装为commonPluginJars(公共插件jar包);
解析source、tranform、sink
List extends Config sourceConfigs = TypesafeConfigUtils.getConfigList(seaTunnelJobConfig, “source”, Collections.emptyList()); List extends Config transformConfigs = TypesafeConfigUtils.getConfigList(seaTunnelJobConfig, “transform”, Collections.emptyList()); List extends Config sinkConfigs = TypesafeConfigUtils.getConfigList(seaTunnelJobConfig, “sink”, Collections.emptyList()); 解析source、transform、sink部分的内容,获取各部分的配置信息。
解析connector
List sourceConnectorJars = getConnectorJarList(sourceConfigs, PluginType.SOURCE); List transformConnectorJars = getConnectorJarList(transformConfigs, PluginType.TRANSFORM); List sinkConnectorJars = getConnectorJarList(sinkConfigs, PluginType.SINK); 获得source、transform、sink三部分的使用的connector信息。
换用seatunnel自己的classLoader
ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader(); 将当前使用中的classLoader暂存。 List sourceJars = Stream.of(sourceConnectorJars, transformConnectorJars) .flatMap(Collection::stream) .distinct() .collect(Collectors.toList()); ClassLoader sourceAndTransformClassLoader = getClassLoader(classLoaderService, parentClassLoader, sourceJars); ClassLoader sinkClassLoader = getClassLoader(classLoaderService, parentClassLoader, sinkConnectorJars); 将source和transform的connector jar包资源合并去重,然后分别创建“source与transform共用的classLoader”以及“sink使用的classLoader”。 Thread.currentThread().setContextClassLoader(sourceAndTransformClassLoader); 先使用“source与transform共用的classLoader”进行处理
检查配置文件是否可有构建成DAG
ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs, sinkConfigs); 检查source、sink、transform(如有)能否组成一个DAG。
checkGraph方法中
if (CollectionUtils.isEmpty(sources) || CollectionUtils.isEmpty(sinks)) { throw new JobDefineCheckException(“Source And Sink can not be null”); } 如果缺少source或sink直接报错,source和sink作为执行任务的必备元素,不可或缺。 if (isSimpleGraph(sources, transforms, sinks)) { checkSimpleGraph(sources, transforms, sinks); return; } checkComplexGraph(sources, transforms, sinks); 如果是简单的图(graph)则执行简单查询,执行完直接返回,否则就执行复杂检查。
是否是简单图:isSimpleGraph(sources, transforms, sinks)
sources.size() == 1 && sinks.size() == 1 && (CollectionUtils.isEmpty(transforms) || transforms.size() == 1) 如果source下只有一个内容,并且sink下也只有一个内容,并且没有transform或只有一个transform,则认为是简单图。 这也是最常用的两种配置模式
- 单source+单sink
- 单source+单transform+单sink
执行简单检查:checkSimpleGraph(sources, transforms, sinks)
ReadonlyConfig source = ReadonlyConfig.fromConfig(sources.get(0)); ReadonlyConfig sink = ReadonlyConfig.fromConfig(sinks.get(0)); if (transforms.size() == 0) { checkEdge(source, sink); } else { ReadonlyConfig transform = ReadonlyConfig.fromConfig(transforms.get(0)); checkEdge(source, transform); checkEdge(transform, sink); }
- 如果没有transform,就检查source的plugin_output(之前版本是result_table_name)和sink的plugin_input(之前版本是source_table_name)是否匹配。
- 如果有transform,就检查source的plugin_output(之前版本是result_table_name)和transform的plugin_input(之前版本是source_table_name)是否匹配;以及就检查transform的plugin_output(之前版本是result_table_name)和sink的plugin_input(之前版本是source_table_name)是否匹配。 在此过程中,若配置文件中没有配置plugin_input和plugin_output,则使用默认的名称default-identifier代替。
解析source、transform、sink配置内容
for (int configIndex = 0; configIndex < sourceConfigs.size(); configIndex++) { Config sourceConfig = sourceConfigs.get(configIndex); Tuple2» tuple2 = parseSource(configIndex, sourceConfig, sourceAndTransformClassLoader); tableWithActionMap.put(tuple2._1(), tuple2._2()); } log.info(“start generating all transforms.”); parseTransforms(transformConfigs, sourceAndTransformClassLoader, tableWithActionMap); Thread.currentThread().setContextClassLoader(sinkClassLoader); log.info(“start generating all sinks.”); List sinkActions = new ArrayList<>(); for (int configIndex = 0; configIndex < sinkConfigs.size(); configIndex++) { Config sinkConfig = sinkConfigs.get(configIndex); sinkActions.addAll( parseSink(configIndex, sinkConfig, sinkClassLoader, tableWithActionMap)); } Set factoryUrls = getUsedFactoryUrls(sinkActions); return new ImmutablePair<>(sinkActions, factoryUrls); 逐步解析source、transform、sink的配置内容,最终组装成action返回。 从MultipleTableJobConfigParser的类名刻制,这是支持多表配置文件解析的,所以在解析source、transform、sink时都有循环便利的代码存在。
结束
Thread.currentThread().setContextClassLoader(parentClassLoader); if (classLoaderService != null) { classLoaderService.releaseClassLoader(Long.parseLong(jobConfig.getJobContext().getJobId()), sourceJars); classLoaderService.releaseClassLoader( Long.parseLong(jobConfig.getJobContext().getJobId()), sinkConnectorJars); } 换回默认的类加载器,并释放“source与transform共用的classLoader”以及“sink使用的classLoader”资源。