Python 如何使用Wait功能在Boto3中验证S3存储桶中不存在某个键?

Python 如何使用Wait功能在Boto3中验证S3存储桶中不存在某个键?

问题陈述 -使用Python的boto3库和waiters功能检查存储桶中是否存在指定的键,例如检查Bucket_1中是否不存在test1.zip键。

阅读更多:Python 教程

解决此问题的方法/算法

步骤1 -导入boto3库和botocore异常以处理异常。

步骤2 -函数的两个参数是bucket_name和key。

步骤3 -使用boto3库创建AWS会话。

步骤4 -为S3创建AWS客户端。

步骤5 -现在使用get_waiter函数创建object_not_exists的waiter对象。

步骤6 -使用waiter对象验证指定的存储桶中是否存在键。默认情况下,它每5秒检查一次,直到达到成功状态为止。在20次检查失败后,会返回错误。但是,用户可以定义轮询时间和最大尝试次数。

步骤7 -它返回None。

步骤8 -如果在检查存储桶时出现问题,则处理通用异常。

示例

使用以下代码使用waiter检查是否在存储桶中存在键-

import boto3
from botocore.exceptions import ClientError

def use_waiters_check_object_not_exists(bucket_name, key_name):
   session = boto3.session.Session()
   s3_client = session.client('s3')
   try:
      waiter = s3_client.get_waiter('object_not_exists')
      waiter.wait(Bucket=bucket_name, Key = key_name,
                  WaiterConfig={
                     'Delay': 2, 'MaxAttempts': 5})
      print('Object does not exist: ' + bucket_name +'/'+key_name)
   except ClientError as e:
      raise Exception( "boto3 client error in use_waiters_check_object_not_exists: " + e.__str__())
   except Exception as e:
      raise Exception( "Unexpected error in use_waiters_check_object_not_exists: " + e.__str__())

print(use_waiters_check_object_exists("Bucket_1","testfolder/test1.zip"))
print(use_waiters_check_object_exists("Bucket_1","testfolder/test.zip"))

输出

Object does not exist: Bucket_1/testfolder/test1.zip
None

botocore.exceptions.WaiterError: Waiter ObjectNotExists failed: Max attempts exceeded
"Unexpected error in use_waiters_check_object_not_exists: " + e.__str__())
Exception: Unexpected error in use_waiters_check_object_not_exists:
Waiter ObjectNotExists failed: Max attempts exceed

对于Bucket_1/testfolder/test1.zip,输出是打印语句和None。因为响应不返回任何内容,所以打印了None。

对于Bucket_1/testfolder/test.zip,输出是异常,因为此对象存在。

在异常中,可以看到Max attempts exceed。

Python教程

Java教程

Web教程

数据库教程

图形图像教程

大数据教程

开发工具教程

计算机教程