静态文件存储插件:本地文件存储#
功能介绍#
实现静态文件的本地化存储,存储路径默认为/data
- 注意: 此为平台插件,需要平台管理员权限进行配置
配置指南#
实现思路#
开发者在开发静态文件存储插件时,需继承StorageExtension,并重载save_file和resolve方法即可
抽象方法实现#
代码#
        
extension_root.com_longgui_storage_local.LocalStorageExtension            (StorageExtension)
        
#
    Source code in extension_root/com_longgui_storage_local/__init__.py
          class LocalStorageExtension(StorageExtension):
    def load(self):
        self.register_profile_schema(ProfileSchema)
        self.register_api(
            "/localstorage/{file_name}",
            'GET',
            self.get_file,
            tenant_path=True,
            auth=None
        )
        super().load()
    def save_file(self, file, f_key, response=None, *args, **kwargs):
        print(f_key)
        extension = self.model
        storage_path = extension.profile.get('storage_path','/data')
        p = Path(storage_path) / f_key
        if not p.parent.exists():
            p.parent.mkdir(parents=True)
        with open(p, 'wb') as fp:
            for chunk in file.chunks() if file else response:
                fp.write(chunk)
    def resolve(self, f_key, tenant, *args, **kwargs):
        host = get_app_config().get_frontend_host()
        return f'{host}/api/v1/tenant/{tenant.id}/com_longgui_storage_local/localstorage/{f_key}'
    def get_file(self, request, tenant_id: str, file_name:str):
        """ 本地存储插件获取文件
        """
        extension = self.model
        storage_path = extension.profile.get('storage_path','/data')
        file_path = Path(storage_path) / file_name
        return FileResponse(
            open(file_path, 'rb')
        )
    def read(self,tenant_id,file_url,**kwargs):
        """读取文件数据
        Args:
            tenant_id (str): 租户ID
            file_url (str): 文件链接
        Returns:
            bytes: 文件数据
        """
        host = get_app_config().get_frontend_host()
        useless_part = f'{host}/api/v1/tenant/{tenant_id}/com_longgui_storage_local/localstorage/'
        file_name = file_url.replace(useless_part, "")
        extension = self.model
        storage_path = extension.profile.get('storage_path','/data')
        file_path = Path(storage_path) / file_name
        rs = None
        with open(file_path,"rb") as f:
            rs = f.read()
        return rs
get_file(self, request, tenant_id, file_name)
#
    本地存储插件获取文件
Source code in extension_root/com_longgui_storage_local/__init__.py
          
        
load(self)
#
    
  
read(self, tenant_id, file_url, **kwargs)
#
    读取文件数据
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| tenant_id | str | 租户ID | required | 
| file_url | str | 文件链接 | required | 
Returns:
| Type | Description | 
|---|---|
| bytes | 文件数据 | 
Source code in extension_root/com_longgui_storage_local/__init__.py
          def read(self,tenant_id,file_url,**kwargs):
    """读取文件数据
    Args:
        tenant_id (str): 租户ID
        file_url (str): 文件链接
    Returns:
        bytes: 文件数据
    """
    host = get_app_config().get_frontend_host()
    useless_part = f'{host}/api/v1/tenant/{tenant_id}/com_longgui_storage_local/localstorage/'
    file_name = file_url.replace(useless_part, "")
    extension = self.model
    storage_path = extension.profile.get('storage_path','/data')
    file_path = Path(storage_path) / file_name
    rs = None
    with open(file_path,"rb") as f:
        rs = f.read()
    return rs
resolve(self, f_key, tenant, *args, **kwargs)
#
    生成文件链接
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| f_key | str | 存储文件名称 | required | 
| tenant | Tenant | 租户 | required | 
save_file(self, file, f_key, response=None, *args, **kwargs)
#
    保存文件
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| file | File | 文件对象 | required | 
| f_key | str | 存储文件名称 | required | 
Source code in extension_root/com_longgui_storage_local/__init__.py
          def save_file(self, file, f_key, response=None, *args, **kwargs):
    print(f_key)
    extension = self.model
    storage_path = extension.profile.get('storage_path','/data')
    p = Path(storage_path) / f_key
    if not p.parent.exists():
        p.parent.mkdir(parents=True)
    with open(p, 'wb') as fp:
        for chunk in file.chunks() if file else response:
            fp.write(chunk)


