静态存储
功能介绍#
静态存储插件实现向arkid增加静态文件(如图片,证书等)上传的能力,开发者仅需继承基类并重载对应抽象方法即可。
实现思路#
开发者在开发静态存储插件时,仅需继承StorageExtension基类,并重载save_file等抽象函数即可。
示例如下:
class LocalStorageExtension(StorageExtension):
def load(self):
self.register_profile_schema(ProfileSchema)
self.register_api(
"/localstorage/{file_name}",
'GET',
self.get_file,
tenant_path=True,
)
super().load()
def save_file(self, file, f_key, *args, **kwargs):
extension = self.model
storage_path = extension.profile.get('storage_path','./storage/')
p = Path(storage_path) / f_key
if not p.parent.exists():
p.parent.mkdir(parents=True)
with open(p, 'wb') as fp:
for chunk in file.chunks():
fp.write(chunk)
def resolve(self, f_key, tenant, *args, **kwargs):
host = get_app_config().get_frontend_host()
return f'{host}/api/v1/tenant/{tenant.id}/com_longgui_storage_local/localstorage/{f_key}'
def get_file(self, request, tenant_id: str, file_name:str):
""" 本地存储插件获取文件
"""
extension = self.model
storage_path = extension.profile.get('storage_path','./storage/')
file_path = Path(storage_path) / file_name
return FileResponse(
open(file_path, 'rb')
)
def read(self,tenant_id,file_url,**kwargs):
"""读取文件数据
Args:
tenant_id (str): 租户ID
file_url (str): 文件链接
Returns:
bytes: 文件数据
"""
host = get_app_config().get_frontend_host()
useless_part = f'{host}/api/v1/tenant/{tenant_id}/com_longgui_storage_local/localstorage/'
file_name = file_url.replace(useless_part, "")
extension = self.model
storage_path = extension.profile.get('storage_path','/data')
file_path = Path(storage_path) / file_name
rs = None
with open(file_path,"rb") as f:
rs = f.read()
return rs
文件存储与读取示例如下:
tenant = request.tenant
data = {
"file": file,
}
extension = Extension.active_objects.filter(
type="storage"
).first()
# 存储文件事件
responses = dispatch_event(Event(tag=SAVE_FILE, tenant=tenant, request=request, packages=extension.package, data=data))
if not responses:
return ErrorDict(ErrorCode.STORAGE_NOT_EXISTS)
useless, (data, extension) = responses[0]
if not data:
return ErrorDict(ErrorCode.STORAGE_FAILED)
#读取文件事件
t_responses = dispatch_event(Event(tag=READ_FILE, tenant=tenant, packages=extension.package, data={"url":data}))
if not t_responses:
return ErrorDict(ErrorCode.STORAGE_NOT_EXISTS)
useless, (data, extension) = t_responses[0]
if not data:
print("读取失败")
抽象函数#
基类定义#
arkid.core.extension.storage.StorageExtension (Extension)
#
Source code in arkid/core/extension/storage.py
class StorageExtension(Extension):
TYPE = "storage"
@property
def type(self):
return StorageExtension.TYPE
def load(self):
super().load()
self.listen_event(SAVE_FILE, self.event_save_file)
self.listen_event(READ_FILE, self.event_read_file)
def event_save_file(self, event, **kwargs):
tenant = event.tenant
file = event.data.get('file', None)
fileurl = event.data.get('fileurl', None)
if fileurl:
import requests
response = requests.get(fileurl, stream=True)
f_key = self.generate_key('temp.jpg')
self.save_file(None, f_key, response, event)
else:
f_key = self.generate_key(file.name)
self.save_file(file, f_key, event)
return self.resolve(f_key, tenant, None, event)
def event_read_file(self,event,**kwargs):
file_url = event.data["url"]
return self.read(tenant_id=event.tenant.id,file_url=file_url,**kwargs)
@abstractmethod
def save_file(self, file, f_key: str, response=None, **kwargs):
"""保存文件
Args:
file (File): 文件对象
f_key (str): 存储文件名称
"""
pass
@abstractmethod
def resolve(self, f_key: str, tenant, **kwargs):
"""生成文件链接
Args:
f_key (str): 存储文件名称
tenant (Tenant): 租户
"""
pass
@abstractmethod
def read(self,file_url: str,**kwargs):
"""通过文件链接读取文件数据
Args:
file_url (str): 文件链接
"""
pass
def generate_key(self, file_name: str):
"""生成存储文件名
Args:
file_name (str): 原始文件名,用于获取文件后缀
Returns:
str: 文件名
"""
key = '{}.{}'.format(
uuid.uuid4().hex,
file_name.split('.')[-1],
)
return key
示例#
extension_root.com_longgui_storage_local.LocalStorageExtension (StorageExtension)
#
Source code in extension_root/com_longgui_storage_local/__init__.py
class LocalStorageExtension(StorageExtension):
def load(self):
self.register_profile_schema(ProfileSchema)
self.register_api(
"/localstorage/{file_name}",
'GET',
self.get_file,
tenant_path=True,
auth=None
)
super().load()
def save_file(self, file, f_key, response=None, *args, **kwargs):
print(f_key)
extension = self.model
storage_path = extension.profile.get('storage_path','/data')
p = Path(storage_path) / f_key
if not p.parent.exists():
p.parent.mkdir(parents=True)
with open(p, 'wb') as fp:
for chunk in file.chunks() if file else response:
fp.write(chunk)
def resolve(self, f_key, tenant, *args, **kwargs):
host = get_app_config().get_frontend_host()
return f'{host}/api/v1/tenant/{tenant.id}/com_longgui_storage_local/localstorage/{f_key}'
def get_file(self, request, tenant_id: str, file_name:str):
""" 本地存储插件获取文件
"""
extension = self.model
storage_path = extension.profile.get('storage_path','/data')
file_path = Path(storage_path) / file_name
return FileResponse(
open(file_path, 'rb')
)
def read(self,tenant_id,file_url,**kwargs):
"""读取文件数据
Args:
tenant_id (str): 租户ID
file_url (str): 文件链接
Returns:
bytes: 文件数据
"""
host = get_app_config().get_frontend_host()
useless_part = f'{host}/api/v1/tenant/{tenant_id}/com_longgui_storage_local/localstorage/'
file_name = file_url.replace(useless_part, "")
extension = self.model
storage_path = extension.profile.get('storage_path','/data')
file_path = Path(storage_path) / file_name
rs = None
with open(file_path,"rb") as f:
rs = f.read()
return rs
get_file(self, request, tenant_id, file_name)
#
本地存储插件获取文件
Source code in extension_root/com_longgui_storage_local/__init__.py
load(self)
#
read(self, tenant_id, file_url, **kwargs)
#
读取文件数据
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tenant_id |
str |
租户ID |
required |
file_url |
str |
文件链接 |
required |
Returns:
Type | Description |
---|---|
bytes |
文件数据 |
Source code in extension_root/com_longgui_storage_local/__init__.py
def read(self,tenant_id,file_url,**kwargs):
"""读取文件数据
Args:
tenant_id (str): 租户ID
file_url (str): 文件链接
Returns:
bytes: 文件数据
"""
host = get_app_config().get_frontend_host()
useless_part = f'{host}/api/v1/tenant/{tenant_id}/com_longgui_storage_local/localstorage/'
file_name = file_url.replace(useless_part, "")
extension = self.model
storage_path = extension.profile.get('storage_path','/data')
file_path = Path(storage_path) / file_name
rs = None
with open(file_path,"rb") as f:
rs = f.read()
return rs
resolve(self, f_key, tenant, *args, **kwargs)
#
生成文件链接
Parameters:
Name | Type | Description | Default |
---|---|---|---|
f_key |
str |
存储文件名称 |
required |
tenant |
Tenant |
租户 |
required |
save_file(self, file, f_key, response=None, *args, **kwargs)
#
保存文件
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file |
File |
文件对象 |
required |
f_key |
str |
存储文件名称 |
required |
Source code in extension_root/com_longgui_storage_local/__init__.py
def save_file(self, file, f_key, response=None, *args, **kwargs):
print(f_key)
extension = self.model
storage_path = extension.profile.get('storage_path','/data')
p = Path(storage_path) / f_key
if not p.parent.exists():
p.parent.mkdir(parents=True)
with open(p, 'wb') as fp:
for chunk in file.chunks() if file else response:
fp.write(chunk)