import io from minio import Minio from cache import make_build_key class MinioCache: def __init__(self, endpoint, access, secret, bucket="fester-cache"): self.client = Minio( endpoint, access_key=access, secret_key=secret, secure=False ) self.bucket = bucket if not self.client.bucket_exists(bucket): self.client.make_bucket(bucket) def put(self, key, data): stream = io.BytesIO(data.encode() if isinstance(data, str) else data) self.client.put_object( self.bucket, key, stream, length=len(stream.getvalue()) ) def get(self, key): try: response = self.client.get_object(self.bucket, key) return response.read() except: return None def exists(self, key): try: self.client.stat_object(self.bucket, key) return True except: return False