aws学习笔记第三十三课-深入使用cdk-练习aws-athena
aws(学习笔记第三十三课) 深入使用cdk 练习aws athena
aws(学习笔记第三十三课) 深入使用cdk
- 使用
cdk
生成athena
以及aws glue crawler
学习内容:
- 使用
aws athena
+aws glue crawler
1. 使用 aws athena
1.1 什么是 aws athena
aws athena
是
aws
提供的数据分析
service
,可以使用
SQL
语言对
S3
上保存的数据进行分析。
managed service
,所以不需要维护。基于
OpenSource
的框架构筑基于处理的数据量进行收费
对数据提供加密功能
注意 和
RDB
不能进行JOIN操作,所以只能提供对csv
和json
进行数据查询
1.2 什么是 aws glue
aws glue
是
aws
提供的
managed ETL service
。能够简单的进行分析数据的准备和
load
。
table
和
schema
关联的
metadata
能够作为
aws glue catalog data
进行保存。
1.2 为什么 aws athena
和 aws glue
一起使用
aws athena
结合
aws glue
能够将
aws glue
作成的
database
或者
schema
,使用
aws athena
进行查询。
2. 开始练习 aws athena
2.1 代码链接
2.2 整体架构
2.3 代码解析
2.3.1 创建测试数据的 S3 bucket
# creating the buckets where the logs will be placed
logs_bucket = s3.Bucket(self, 'logs-bucket',
bucket_name=f"auditing-logs-{self.account}",
removal_policy=RemovalPolicy.DESTROY,
auto_delete_objects=True
)
2.3.2 创建保存查询结果的 S3 bucket
# creating the bucket where the queries output will be placed
query_output_bucket = s3.Bucket(self, 'query-output-bucket',
bucket_name=f"auditing-analysis-output-{self.account}",
removal_policy=RemovalPolicy.DESTROY,
auto_delete_objects=True
)
2.3.3 将示例的程序 json
数据文件同期到 S3 bucket
# uploading the log files to the bucket as examples
s3_deployment.BucketDeployment(self, 'sample-files',
destination_bucket=logs_bucket,
sources=[s3_deployment.Source.asset('./log-samples')],
content_type='application/json',
retain_on_delete=False
)
2.3.4 创建 aws glue
的 cfnDatabase
# creating the Glue Database to serve as our Data Catalog
glue_database = glue.CfnDatabase(self, 'log-database',
catalog_id=self.account,
database_input=glue.CfnDatabase.DatabaseInputProperty(
name="log-database"))
2.3.5 创建 aws glue crawler
需要的权限 Role
# creating the permissions for the crawler to enrich our Data Catalog
glue_crawler_role = iam.Role(self, 'glue-crawler-role',
role_name='glue-crawler-role',
assumed_by=iam.ServicePrincipal(service='glue.amazonaws.com'),
managed_policies=[
# Remember to apply the Least Privilege Principle and provide only the permissions needed to the crawler
iam.ManagedPolicy.from_managed_policy_arn(self, 'AmazonS3FullAccess',
'arn:aws:iam::aws:policy/AmazonS3FullAccess'),
iam.ManagedPolicy.from_managed_policy_arn(self, 'AWSGlueServiceRole',
'arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole')
])
这里需要两个
policy
,
AmazonS3FullAccess
和
AWSGlueServiceRole
。
2.3.6 创建 aws glue crawler
# creating the Glue Crawler that will automatically populate our Data Catalog. Don't forget to run the crawler
# as soon as the deployment finishes, otherwise our Data Catalog will be empty. Check out the README for more instructions
glue.CfnCrawler(self, 'logs-crawler',
name='logs-crawler',
database_name=glue_database.database_input.name,
role=glue_crawler_role.role_name,
targets={
"s3Targets": [
{"path": f's3://{logs_bucket.bucket_name}/products'},
{"path": f's3://{logs_bucket.bucket_name}/users'}
]
})
这里,
aws glue crawler
执行
ETL Extract Transform Load
,将
S3 bucket
里面的
products
和
users
的数据文件,经过转换将
json
数据文件
load
到
glue database
。
2.3.7 创建 aws athena work group
# creating the Athena Workgroup to store our queries
work_group = athena.CfnWorkGroup(self, 'log-auditing-work-group',
name='log-auditing',
work_group_configuration=athena.CfnWorkGroup.WorkGroupConfigurationProperty(
result_configuration=athena.CfnWorkGroup.ResultConfigurationProperty(
output_location=f"s3://{query_output_bucket.bucket_name}",
encryption_configuration=athena.CfnWorkGroup.EncryptionConfigurationProperty(
encryption_option="SSE_S3"
))))
aws athena
通过
work group
进行管理,创建了
workgroup
之后,在里面继续创建
query
。
2.3.8 创建 aws athena query
# creating an example query to fetch all product events by date
product_events_by_date_query = athena.CfnNamedQuery(self, 'product-events-by-date-query',
database=glue_database.database_input.name,
work_group=work_group.name,
name="product-events-by-date",
query_string="SELECT * FROM \"log-database\".\"products\" WHERE \"date\" = '2024-01-19'")
# creating an example query to fetch all user events by date
user_events_by_date_query = athena.CfnNamedQuery(self, 'user-events-by-date-query',
database=glue_database.database_input.name,
work_group=work_group.name,
name="user-events-by-date",
query_string="SELECT * FROM \"log-database\".\"users\" WHERE \"date\" = '2024-01-22'")
# creating an example query to fetch all events by the user ID
all_events_by_userid_query = athena.CfnNamedQuery(self, 'all-events-by-userId-query',
database=glue_database.database_input.name,
work_group=work_group.name,
name="all-events-by-userId",
query_string="SELECT * FROM (\n"
" SELECT transactionid, userid, username, domain, datetime, action FROM \"log-database\".\"products\" \n"
"UNION \n"
" SELECT transactionid, userid, username, domain, datetime, action FROM \"log-database\".\"users\" \n"
") WHERE \"userid\" = '123'")
2.3.9 调整执行顺序
# adjusting the resource creation order
product_events_by_date_query.add_dependency(work_group)
user_events_by_date_query.add_dependency(work_group)
all_events_by_userid_query.add_dependency(work_group)
2.4 开始执行 aws cdk for athena
2.4.1 执行部署
python -m venv .venv
source .venv/Scripts/activate # windows platform
pip install -r requirements.txt
cdk synth
cdk --require-approval never deploy
2.4.2 执行 crawler
爬虫
默认
crawler
是不启动的,需要
run
起来。
正常执行完毕。数据都由
S3 bucket
的
json
文件,经过
ETL
,进入到
aws glue database
里面了。
2.4.3 查看 aws athena
的 queries
AWS Athena
查询编辑器 > 已保存的查询 > 工作组 >
log auditing