Python 如何使用Wait功能在Boto3中验证S3存储桶中不存在某个键?
问题陈述 -使用Python的boto3库和waiters功能检查存储桶中是否存在指定的键,例如检查Bucket_1中是否不存在test1.zip键。
阅读更多:Python 教程
解决此问题的方法/算法
步骤1 -导入boto3库和botocore异常以处理异常。
步骤2 -函数的两个参数是bucket_name和key。
步骤3 -使用boto3库创建AWS会话。
步骤4 -为S3创建AWS客户端。
步骤5 -现在使用get_waiter函数创建object_not_exists的waiter对象。
步骤6 -使用waiter对象验证指定的存储桶中是否存在键。默认情况下,它每5秒检查一次,直到达到成功状态为止。在20次检查失败后,会返回错误。但是,用户可以定义轮询时间和最大尝试次数。
步骤7 -它返回None。
步骤8 -如果在检查存储桶时出现问题,则处理通用异常。
示例
使用以下代码使用waiter检查是否在存储桶中存在键-
import boto3
from botocore.exceptions import ClientError
def use_waiters_check_object_not_exists(bucket_name, key_name):
session = boto3.session.Session()
s3_client = session.client('s3')
try:
waiter = s3_client.get_waiter('object_not_exists')
waiter.wait(Bucket=bucket_name, Key = key_name,
WaiterConfig={
'Delay': 2, 'MaxAttempts': 5})
print('Object does not exist: ' + bucket_name +'/'+key_name)
except ClientError as e:
raise Exception( "boto3 client error in use_waiters_check_object_not_exists: " + e.__str__())
except Exception as e:
raise Exception( "Unexpected error in use_waiters_check_object_not_exists: " + e.__str__())
print(use_waiters_check_object_exists("Bucket_1","testfolder/test1.zip"))
print(use_waiters_check_object_exists("Bucket_1","testfolder/test.zip"))
输出
Object does not exist: Bucket_1/testfolder/test1.zip
None
botocore.exceptions.WaiterError: Waiter ObjectNotExists failed: Max attempts exceeded
"Unexpected error in use_waiters_check_object_not_exists: " + e.__str__())
Exception: Unexpected error in use_waiters_check_object_not_exists:
Waiter ObjectNotExists failed: Max attempts exceed
对于Bucket_1/testfolder/test1.zip,输出是打印语句和None。因为响应不返回任何内容,所以打印了None。
对于Bucket_1/testfolder/test.zip,输出是异常,因为此对象存在。
在异常中,可以看到Max attempts exceed。
极客教程