app-MAIL-temp/app/s3.py

51 lines
1.2 KiB
Python

from io import BytesIO
import boto3
import requests
from app.config import (
AWS_REGION,
BUCKET,
AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY,
AVATAR_URL_EXPIRATION,
)
session = boto3.Session(
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION,
)
def upload_from_bytesio(key: str, bs: BytesIO, content_type="string") -> None:
bs.seek(0)
session.resource("s3").Bucket(BUCKET).put_object(
Key=key, Body=bs, ContentType=content_type
)
def upload_from_url(url: str, upload_path):
r = requests.get(url)
upload_from_bytesio(upload_path, BytesIO(r.content))
def delete_file(key: str) -> None:
o = session.resource("s3").Bucket(BUCKET).Object(key)
o.delete()
def get_url(key: str, expires_in=3600) -> str:
s3_client = session.client("s3")
return s3_client.generate_presigned_url(
ExpiresIn=expires_in,
ClientMethod="get_object",
Params={"Bucket": BUCKET, "Key": key},
)
if __name__ == "__main__":
with open("/tmp/1.png", "rb") as f:
upload_from_bytesio("1.png", BytesIO(f.read()))
print(get_url(BUCKET, "1.png"))