目录

aws学习笔记第三十三课-深入使用cdk-练习aws-athena

aws(学习笔记第三十三课) 深入使用cdk 练习aws athena

aws(学习笔记第三十三课) 深入使用cdk

  • 使用 cdk 生成 athena 以及 aws glue crawler

学习内容:

  • 使用 aws athena + aws glue crawler

1. 使用 aws athena

1.1 什么是 aws athena

aws athenaaws 提供的数据分析 service ,可以使用 SQL 语言对 S3 上保存的数据进行分析。

  • managed service ,所以不需要维护。

  • 基于 OpenSource 的框架构筑

  • 基于处理的数据量进行收费

  • 对数据提供加密功能

    注意 和 RDB 不能进行JOIN操作,所以只能提供对 csvjson 进行数据查询

1.2 什么是 aws glue

aws glueaws 提供的 managed ETL service 。能够简单的进行分析数据的准备和 loadtableschema 关联的 metadata 能够作为 aws glue catalog data 进行保存。

1.2 为什么 aws athenaaws glue 一起使用

aws athena 结合 aws glue 能够将 aws glue 作成的 database 或者 schema ,使用 aws athena 进行查询。

2. 开始练习 aws athena

2.1 代码链接

2.2 整体架构

https://i-blog.csdnimg.cn/direct/dfc8b5a09e7842dfa2fb813a706ee166.png

2.3 代码解析

2.3.1 创建测试数据的 S3 bucket
 # creating the buckets where the logs will be placed
 logs_bucket = s3.Bucket(self, 'logs-bucket',
             bucket_name=f"auditing-logs-{self.account}",
             removal_policy=RemovalPolicy.DESTROY,
             auto_delete_objects=True
 )

https://i-blog.csdnimg.cn/direct/5e92bd3516f647cb9895b5f28f9f2845.png

2.3.2 创建保存查询结果的 S3 bucket
 # creating the bucket where the  queries output will be placed
 query_output_bucket = s3.Bucket(self, 'query-output-bucket',
      bucket_name=f"auditing-analysis-output-{self.account}",
      removal_policy=RemovalPolicy.DESTROY,
       auto_delete_objects=True
    )

https://i-blog.csdnimg.cn/direct/cf2d6066d3e44b8ba68387a56789cc6c.png

2.3.3 将示例的程序 json 数据文件同期到 S3 bucket
# uploading the log files to the bucket as examples
  s3_deployment.BucketDeployment(self, 'sample-files',
         destination_bucket=logs_bucket,
         sources=[s3_deployment.Source.asset('./log-samples')],
         content_type='application/json',
         retain_on_delete=False
)

https://i-blog.csdnimg.cn/direct/e0702ff851b4484b95f46132181fc7b1.png

2.3.4 创建 aws gluecfnDatabase
    # creating the Glue Database to serve as our Data Catalog
    glue_database = glue.CfnDatabase(self, 'log-database',
            catalog_id=self.account,
            database_input=glue.CfnDatabase.DatabaseInputProperty(
            name="log-database"))

https://i-blog.csdnimg.cn/direct/bca8edc891d74e7bba883a934b11da54.png

2.3.5 创建 aws glue crawler 需要的权限 Role
# creating the permissions for the crawler to enrich our Data Catalog
        glue_crawler_role = iam.Role(self, 'glue-crawler-role',
                                     role_name='glue-crawler-role',
                                     assumed_by=iam.ServicePrincipal(service='glue.amazonaws.com'),
                                     managed_policies=[
                                         # Remember to apply the Least Privilege Principle and provide only the permissions needed to the crawler
                                         iam.ManagedPolicy.from_managed_policy_arn(self, 'AmazonS3FullAccess',
                                                                                   'arn:aws:iam::aws:policy/AmazonS3FullAccess'),
                                         iam.ManagedPolicy.from_managed_policy_arn(self, 'AWSGlueServiceRole',
                                                                                   'arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole')
                                     ])

这里需要两个 policyAmazonS3FullAccessAWSGlueServiceRole

https://i-blog.csdnimg.cn/direct/6ea739e38ed94b2da78ad25d1b5b7a39.png

2.3.6 创建 aws glue crawler
 # creating the Glue Crawler that will automatically populate our Data Catalog. Don't forget to run the crawler
        # as soon as the deployment finishes, otherwise our Data Catalog will be empty. Check out the README for more instructions
        glue.CfnCrawler(self, 'logs-crawler',
                        name='logs-crawler',
                        database_name=glue_database.database_input.name,
                        role=glue_crawler_role.role_name,
                        targets={
                            "s3Targets": [
                                {"path": f's3://{logs_bucket.bucket_name}/products'},
                                {"path": f's3://{logs_bucket.bucket_name}/users'}
                            ]
                        })

这里, aws glue crawler 执行 ETL Extract Transform Load ,将 S3 bucket 里面的 productsusers 的数据文件,经过转换将 json 数据文件 loadglue database

https://i-blog.csdnimg.cn/direct/559ab73e241c4cd481f0ac6d16834a34.png

2.3.7 创建 aws athena work group
# creating the Athena Workgroup to store our queries
        work_group = athena.CfnWorkGroup(self, 'log-auditing-work-group',
                                         name='log-auditing',
                                         work_group_configuration=athena.CfnWorkGroup.WorkGroupConfigurationProperty(
                                             result_configuration=athena.CfnWorkGroup.ResultConfigurationProperty(
                                                 output_location=f"s3://{query_output_bucket.bucket_name}",
                                                 encryption_configuration=athena.CfnWorkGroup.EncryptionConfigurationProperty(
                                                     encryption_option="SSE_S3"
                                                 ))))

https://i-blog.csdnimg.cn/direct/af3caa0c11d74ac89ecf9b397e18eeec.png

aws athena 通过 work group 进行管理,创建了 workgroup 之后,在里面继续创建 query

2.3.8 创建 aws athena query
# creating an example query to fetch all product events by date
        product_events_by_date_query = athena.CfnNamedQuery(self, 'product-events-by-date-query',
                                                            database=glue_database.database_input.name,
                                                            work_group=work_group.name,
                                                            name="product-events-by-date",
                                                            query_string="SELECT * FROM \"log-database\".\"products\" WHERE \"date\" = '2024-01-19'")

        # creating an example query to fetch all user events by date
        user_events_by_date_query = athena.CfnNamedQuery(self, 'user-events-by-date-query',
                                                         database=glue_database.database_input.name,
                                                         work_group=work_group.name,
                                                         name="user-events-by-date",
                                                         query_string="SELECT * FROM \"log-database\".\"users\" WHERE \"date\" = '2024-01-22'")

        # creating an example query to fetch all events by the user ID
        all_events_by_userid_query = athena.CfnNamedQuery(self, 'all-events-by-userId-query',
                                                          database=glue_database.database_input.name,
                                                          work_group=work_group.name,
                                                          name="all-events-by-userId",
                                                          query_string="SELECT * FROM (\n"
                                                                       "    SELECT transactionid, userid, username, domain, datetime, action FROM \"log-database\".\"products\" \n"
                                                                       "UNION \n"
                                                                       "    SELECT transactionid, userid, username, domain, datetime, action FROM \"log-database\".\"users\" \n"
                                                                       ") WHERE \"userid\" = '123'")
2.3.9 调整执行顺序
# adjusting the resource creation order
product_events_by_date_query.add_dependency(work_group)
user_events_by_date_query.add_dependency(work_group)
all_events_by_userid_query.add_dependency(work_group)

2.4 开始执行 aws cdk for athena

2.4.1 执行部署
python -m venv .venv
source .venv/Scripts/activate # windows platform
pip install -r requirements.txt
cdk synth
cdk --require-approval never deploy
2.4.2 执行 crawler 爬虫

https://i-blog.csdnimg.cn/direct/767e50b06f5a45749d906f2d61c26274.png

默认 crawler 是不启动的,需要 run 起来。

https://i-blog.csdnimg.cn/direct/bcd3c618ceb745cdb0aab1b91c4ef686.png

正常执行完毕。数据都由 S3 bucketjson 文件,经过 ETL ,进入到 aws glue database 里面了。

https://i-blog.csdnimg.cn/direct/36dc2b99c64b44c2a1adba58dffcb4ee.png

2.4.3 查看 aws athenaqueries

AWS Athena

查询编辑器 > 已保存的查询 > 工作组 > log auditing

https://i-blog.csdnimg.cn/direct/ced480e7a7eb4a6ba48e0500c0f2a49c.png

2.4.4 执行 aws athenaqueries

https://i-blog.csdnimg.cn/direct/f9b9744d94a04ca2a87a8b97fa9a910c.png

2.4.5 查看 aws athenaqueries 执行结果

https://i-blog.csdnimg.cn/direct/e79b65738cf442f187e03276fd89fbec.png