目录

aws学习笔记第三十二课-深入使用cdkAPI-Gateway-event-bridge

aws(学习笔记第三十二课) 深入使用cdk(API Gateway + event bridge)

aws(学习笔记第三十二课) 深入使用cdk

  • 使用 cdk 生成 aws API Gateway + lambda 以及 eventbridge 等等

学习内容:

  • 使用 aws API Gateway + lambda
  • 使用 event bridge 练习 producerconsumer

1. 使用 aws API Gateway + lambda

1.1. 以前的练习

  • 以前的例子

    这个例子中已经使用了手动创建,使用练习了 aws API Gateway + lambda

  • 使用 cdk 来创建

    这里,采用 cdk 的方式来创建 API Gateway + lambda

1.2. 使用 cdk 创建 API Gateway + lambda

  • 整体架构

    https://i-blog.csdnimg.cn/direct/a76ef16babc04594aaa6078adde5eebf.png

  • 代码解析

    • 创建 lambda 函数

      	   base_lambda = _lambda.Function(self, 'ApiCorsLambda',
      	   handler='lambda-handler.handler',
      	   runtime=_lambda.Runtime.PYTHON_3_12,
      	   code=_lambda.Code.from_asset('lambda'))

      注意,这里没有创建 VPC ,因为这里不需要显示的创建 VPC

      https://i-blog.csdnimg.cn/direct/0333c15d48a34a59916cae985b8cc4aa.png

      • 创建 API 并且添加 resource

                base_api = _apigw.RestApi(self, 'ApiGatewayWithCors',
                                          rest_api_name='ApiGatewayWithCors')
        
                example_entity = base_api.root.add_resource(
                    'example',
                default_cors_preflight_options=_apigw.CorsOptions(
                        allow_methods=['GET', 'OPTIONS'],
                        allow_origins=_apigw.Cors.ALL_ORIGINS)

        https://i-blog.csdnimg.cn/direct/e710beb9e1e64ddda60a971339ad82e8.png

      • 创建 LambdaIntegrationAPIlambda 进行绑定

              example_entity_lambda_integration = _apigw.LambdaIntegration(
                    base_lambda,
                    proxy=False,
                    integration_responses=[
                        _apigw.IntegrationResponse(
                            status_code="200",
                            response_parameters={
                                'method.response.header.Access-Control-Allow-Origin': "'*'"
                            }
                        )
                    ]
                )

        https://i-blog.csdnimg.cn/direct/fdb012c72aae4609bf3a269b6928a11f.png

      • API 加入 method

              example_entity.add_method(
                    'GET', example_entity_lambda_integration,
                    method_responses=[
                        _apigw.MethodResponse(
                            status_code="200",
                            response_parameters={
                                'method.response.header.Access-Control-Allow-Origin': True
                            }
                        )
                    ]
                )

        https://i-blog.csdnimg.cn/direct/92e3cdfba1db4081909519e173a26a07.png

1.3. 确认 cdk 创建 API Gateway + lambda

  • 执行创建的 cdk

    	cdk --require-approval never deploy
  • 查看创建的结果

    • lambda 创建结果

      https://i-blog.csdnimg.cn/direct/1d992e3fe5144b6896335bfc624e0638.png

    • API 创建结果

      https://i-blog.csdnimg.cn/direct/9c3afc3912954086b816f40d3dc80ce6.png

    • 确认 API 的调用 URL

      https://i-blog.csdnimg.cn/direct/54aea8a3cb6c4e3bb11d336b19a7c7b4.png

    • 访问 API 的调用 URL

      之后不要忘记 cdk destroy

      https://i-blog.csdnimg.cn/direct/00cf9499f88c4c6d8051ebdb252f16f5.png

2. 使用 event bridge 练习 producerconsumer

2.1. 代码链接

这里主要练习使用 eventbridge ,进行 producerconsumer 的练习。

2.2. 开始练习

  • 整体架构

    https://i-blog.csdnimg.cn/direct/8764adf8995c4b39a13b9740970a6b81.png

  • 代码解析

    • 生成 producer

      			        #
      			        # Producer Lambda
      			        #
      			        event_producer_lambda = _lambda.Function(self, "eventProducerLambda",
      			                                                 runtime=_lambda.Runtime.PYTHON_3_12,
      			                                                 handler="event_producer_lambda.lambda_handler",
      			                                                 code=_lambda.Code.from_asset("lambda")
      			                                                 )
      
      			        event_policy = iam.PolicyStatement(effect=iam.Effect.ALLOW, resources=['*'], actions=['events:PutEvents'])
      
      			        event_producer_lambda.add_to_role_policy(event_policy)

      这里, producer 被赋予权限 putEvents ,因为之后要向 eventbridge 进行 putEvents 操作。

      https://i-blog.csdnimg.cn/direct/a2368cad51bc4dd889b92ae7c24395f6.png

    • producer 的处理代码

      	def lambda_handler(event, context):
      	    eventbridge_client = boto3.client('events')
      	    request_body = event["body"]
      	    if request_body is None:
      	        request_body = ""
      	    # Structure of EventBridge Event
      	    eventbridge_event = {
      	        'Time': datetime.datetime.now(),
      	        'Source': 'com.mycompany.myapp',
      	        'Detail': request_body,
      	        'DetailType': 'service_status'
      	    }
      	    logger.info(eventbridge_event)
      
      	    # Send event to EventBridge
      	    response = eventbridge_client.put_events(
      	        Entries=[
      	            eventbridge_event
      	        ]
      	    )
      
      	    logger.info(response)
      
      	    # Returns success reponse to API Gateway
      	    return {
      	        "statusCode": 200,
      	        "body": json.dumps({
      	            "result": "from Producer"
      	        }),
      	    }

      这里,使用了 boto3 这个 python package ,AWS ‌Boto3‌ 是亚马逊云服务( AWS )官方提供的 Python SDK ,主要用于通过代码与 AWS 服务进行交互和管理。这里使用 boto3eventbridge 进行 putEvents

    • consumer1consumer2

      			        #
      			        # Approved Consumer1
      			        #
      			        event_consumer1_lambda = _lambda.Function(self, "eventConsumer1Lambda",
      			                                                  runtime=_lambda.Runtime.PYTHON_3_8,
      			                                                  handler="event_consumer_lambda.lambda_handler",
      			                                                  code=_lambda.Code.from_asset("lambda")
      			                                                  )
      
      			        event_consumer1_rule = events.Rule(self, 'eventConsumer1LambdaRule',
      			                                           description='Approved Transactions',
      			                                           event_pattern=events.EventPattern(source=['com.mycompany.myapp']
      			                                                                             ))
      
      			        event_consumer1_rule.add_target(targets.LambdaFunction(handler=event_consumer1_lambda))
      
      			        #
      			        # Approved Consumer2
      			        #
      			        event_consumer2_lambda = _lambda.Function(self, "eventConsumer2Lambda",
      			                                                  runtime=_lambda.Runtime.PYTHON_3_8,
      			                                                  handler="event_consumer_lambda.lambda_handler",
      			                                                  code=_lambda.Code.from_asset("lambda")
      			                                                  )
      
      			        event_consumer2_rule = events.Rule(self, 'eventConsumer2LambdaRule',
      			                                           description='Approved Transactions',
      			                                           event_pattern=events.EventPattern(source=['com.mycompany.myapp']
      			                                                                             ))
      			        event_consumer2_rule.add_target(targets.LambdaFunction(handler=event_consumer2_lambda))

      consumer1consumer2 类似,就是接受到了 eventbridgeevent 之后,进行 log 输出。

      			def lambda_handler(event, context):
      			    logger.info(event)
      
      			    return {
      			        "statusCode": 200,
      			        "body": json.dumps({
      			            "result": "testing..."
      			        }),
      			    }

      https://i-blog.csdnimg.cn/direct/b5d1ab08cfb04778b5508e0767244a42.png

    • consumer3 使用 kinesisfirehoseevent 进行接受,并保存到 S3 bucket

      	        #
      	        # Approved Consumer3
      	        #
      
      	        # Create S3 bucket for KinesisFirehose destination
      	        ingest_bucket = s3.Bucket(self, 'test-ngest-bucket')
      
      	        # Create a Role for KinesisFirehose
      	        firehose_role = iam.Role(
      	            self, 'myRole',
      	            assumed_by=iam.ServicePrincipal('firehose.amazonaws.com'))
      
      	        # Create and attach policy that gives permissions to write in to the S3 bucket.
      	        iam.Policy(
      	            self, 's3_attr',
      	            policy_name='s3kinesis',
      	            statements=[iam.PolicyStatement(
      	                actions=['s3:*'],
      	                resources=['arn:aws:s3:::' + ingest_bucket.bucket_name + '/*'])],
      	                # resources=['*'])],
      	            roles=[firehose_role],
      	        )
      
      	        event_consumer3_kinesisfirehose = _firehose.CfnDeliveryStream(self, "consumer3-firehose",
      	                                                                      s3_destination_configuration=_firehose.CfnDeliveryStream.S3DestinationConfigurationProperty(
      	                                                                          bucket_arn=ingest_bucket.bucket_arn,
      	                                                                          buffering_hints=_firehose.CfnDeliveryStream.BufferingHintsProperty(
      	                                                                              interval_in_seconds=60
      	                                                                          ),
      	                                                                          compression_format="UNCOMPRESSED",
      	                                                                          role_arn=firehose_role.role_arn
      	                                                                      ))
      
      	        event_consumer3_rule = events.Rule(self, 'eventConsumer3KinesisRule',
      	                                           description='Approved Transactions',
      	                                           event_pattern=events.EventPattern(source=['com.mycompany.myapp']
      	                                                                             ))
      	        event_consumer3_rule.add_target(targets.KinesisFirehoseStream(stream=event_consumer3_kinesisfirehose))

      https://i-blog.csdnimg.cn/direct/53e51e85ed3e423ca4f0ddafd9898dfe.png

    • producer 通过 API Gateway 进行公开

      	        # defines an API Gateway REST API resource backed by our "atm_producer_lambda" function.
      	        api = api_gw.LambdaRestApi(self, 'SampleAPI-EventBridge-Multi-Consumer',
      	                             handler=event_producer_lambda,
      	                             proxy=False
      	                             )
      	        items = api.root.add_resource("items")
      	        items.add_method("POST")  # POST /items

2.3. 代码部署的确认

接下来进行 cdk deploy 来确认执行效果。

  • 一个 producer 与两个 consumer ,都是 lambda

    https://i-blog.csdnimg.cn/direct/dea7ea6265da44a2a40161e514324eed.png

  • 第三个 consumer ,一个 kinesisFireHose

    https://i-blog.csdnimg.cn/direct/0fa5b56075e04e2f86c26b6f23963404.png

  • APIproducer

    https://i-blog.csdnimg.cn/direct/07a8e7a84a4d4dd9aeb1459eac9e3644.png

2.4. 对部署进行测试

  • API 进行测试调用

    返回了正常的结果。这里,需要对请求正文 request body 一定要设定参数这里设置如下。

    {"item1":"123","item2":"234"}

    使用 API 调用,之后启动 producerlamdba ,向 eventbridge 进行 putEvents

    https://i-blog.csdnimg.cn/direct/203280f5dd6949789d7c4aa0d45362f9.png

  • consumer1consumer2 进行确认

    https://i-blog.csdnimg.cn/direct/c53c77e86c3e483fb43dd1f719989114.png

  • consumer3 进行确认

    这里主要是对 S3 bucket 进行确认。可以看到,

    producer -> event -> eventbridge -> consumer3 -> kinesis firehose -> S3 bucket

    最后 cdk destroy

https://i-blog.csdnimg.cn/direct/ad1474c3014648cba5b6e1c6f7ad5640.png