如何使用Boto3和AWS Client的waiter功能来检查未经授权的Bucket是否存在?
问题陈述 - 使用Python中的boto3库使用waiter功能检查桶是否不存在。 例如,使用waiter检查S3中是否不存在Bucket_2。
更多Python相关文章,请阅读:Python 教程
解决这个问题的方法/算法
步骤1 - 导入boto3和botocore异常以处理异常。
步骤2 - 将bucket_name用作函数中的参数。
步骤3 - 使用boto3库创建AWS会话。
步骤4 - 为S3创建AWS客户端。
步骤5 - 现在,使用get_waiter函数创建 bucket_not_exists 的wait对象。
步骤6 - 现在,使用 wait 对象来验证Bucket是否不存在。 默认情况下,它会每5秒钟检查一次,直到达到成功状态。 日成功状态意味着桶不应存在于S3中。 在20次失败的检查后,将返回错误。 但是,用户可以定义轮询时间和最大重试次数。
步骤7 - 它返回None。
步骤8 - 如果在检查桶时出现问题,则处理一般异常。
示例
使用以下代码使用waiter检查Bucket是否不存在 –
import boto3
from botocore.exceptions import ClientError
def use_waiters_check_bucket_not_exists(bucket_name):
session = boto3.session.Session()
s3_client = session.client('s3')
try:
waiter = s3_client.get_waiter('bucket_not_exists')
waiter.wait(Bucket=bucket_name,
WaiterConfig={
'Delay': 2, 'MaxAttempts': 5})
print('Bucket not exist: ' + bucket_name)
except ClientError as e:
raise Exception( "boto3 client error in use_waiters_check_bucket_not_exists: " + e.__str__())
except Exception as e:
raise Exception( "Unexpected error in use_waiters_check_bucket_not_exists: " + e.__str__())
print(use_waiters_check_bucket_not_exists("Bucket_2"))
print(use_waiters_check_bucket_not_exists("Bucket_1"))
输出
Bucket not exist: Bucket_2
None
botocore.exceptions.WaiterError: Waiter BucketNotExists failed: Max
attempts exceeded
"Unexpected error in use_waiters_check_bucket_not_exists: " +
e.__str__())
Exception: Unexpected error in use_waiters_check_bucket_not_exists:
Waiter BucketNotExists failed: Max attempts exceed
对于Bucket_2,输出是打印语句和None。 由于响应没有返回任何内容,因此打印出None。
对于Bucket_1,输出是一个异常,因为即使经过了最大尝试次数,该桶仍存在。
在异常中,可以读到最大尝试次数超过。
极客教程