Skip to content

Static file storage plug -in:Local file storage#

Features#

Realize localized storage of static files,Storage path is default/data

  • Notice: This is a platform plug -in,Need a platform administrator permissions for configuration

Configuration guide#

Enter through the menu bar on the left【Tenant management】->【Plug -in management】,Find the local file storage plug -in card in the plug -in lease page,Click to rent
![Plug -in rental] (https://S1.ax1x.com/2022/08/02/vELyVS.png)

Enter through the menu bar on the left【Platform management】->【Platform plug -in】, Find the local file storage plug -in card on the installed page,Click the plug -in configuration,Configure file storage path
![Platform configuration] (https://S1.ax1x.com/2022/08/02/vELXx1.png)
[![Platform configuration] (https://S1.ax1x.com/2022/08/03/widget.md.png)](https://imgtu.com/i/VVD6 and

Implementation#

When developing static file storage plug -in,Need to inherit StorageExtentance,And re -load Save_The file and resolve method can be

Abstract method implementation#

Code#

extension_root.com_longgui_storage_local.LocalStorageExtension (StorageExtension) #

Source code in extension_root/com_longgui_storage_local/__init__.py
class LocalStorageExtension(StorageExtension):

    def load(self):
        self.register_profile_schema(ProfileSchema)

        self.register_api(
            "/localstorage/{file_name}",
            'GET',
            self.get_file,
            tenant_path=True,
            auth=None
        )

        super().load()

    def save_file(self, file, f_key, response=None, *args, **kwargs):
        print(f_key)
        extension = self.model
        storage_path = extension.profile.get('storage_path','/data')

        p = Path(storage_path) / f_key

        if not p.parent.exists():
            p.parent.mkdir(parents=True)

        with open(p, 'wb') as fp:
            for chunk in file.chunks() if file else response:
                fp.write(chunk)

    def resolve(self, f_key, tenant, *args, **kwargs):
        host = get_app_config().get_frontend_host()
        return f'{host}/api/v1/tenant/{tenant.id}/com_longgui_storage_local/localstorage/{f_key}'


    def get_file(self, request, tenant_id: str, file_name:str):
        """ 本地存储插件获取文件
        """
        extension = self.model
        storage_path = extension.profile.get('storage_path','/data')
        file_path = Path(storage_path) / file_name

        return FileResponse(
            open(file_path, 'rb')
        )

    def read(self,tenant_id,file_url,**kwargs):
        """读取文件数据

        Args:
            tenant_id (str): 租户ID
            file_url (str): 文件链接

        Returns:
            bytes: 文件数据
        """
        host = get_app_config().get_frontend_host()
        useless_part = f'{host}/api/v1/tenant/{tenant_id}/com_longgui_storage_local/localstorage/'
        file_name = file_url.replace(useless_part, "")
        extension = self.model
        storage_path = extension.profile.get('storage_path','/data')
        file_path = Path(storage_path) / file_name
        rs = None

        with open(file_path,"rb") as f:
            rs = f.read()

        return rs

get_file(self, request, tenant_id, file_name) #

本地存储插件获取文件

Source code in extension_root/com_longgui_storage_local/__init__.py
def get_file(self, request, tenant_id: str, file_name:str):
    """ 本地存储插件获取文件
    """
    extension = self.model
    storage_path = extension.profile.get('storage_path','/data')
    file_path = Path(storage_path) / file_name

    return FileResponse(
        open(file_path, 'rb')
    )

load(self) #

抽象方法,插件加载的入口方法

Source code in extension_root/com_longgui_storage_local/__init__.py
def load(self):
    self.register_profile_schema(ProfileSchema)

    self.register_api(
        "/localstorage/{file_name}",
        'GET',
        self.get_file,
        tenant_path=True,
        auth=None
    )

    super().load()

read(self, tenant_id, file_url, **kwargs) #

读取文件数据

Parameters:

Name Type Description Default
tenant_id str

租户ID

required
file_url str

文件链接

required

Returns:

Type Description
bytes

文件数据

Source code in extension_root/com_longgui_storage_local/__init__.py
def read(self,tenant_id,file_url,**kwargs):
    """读取文件数据

    Args:
        tenant_id (str): 租户ID
        file_url (str): 文件链接

    Returns:
        bytes: 文件数据
    """
    host = get_app_config().get_frontend_host()
    useless_part = f'{host}/api/v1/tenant/{tenant_id}/com_longgui_storage_local/localstorage/'
    file_name = file_url.replace(useless_part, "")
    extension = self.model
    storage_path = extension.profile.get('storage_path','/data')
    file_path = Path(storage_path) / file_name
    rs = None

    with open(file_path,"rb") as f:
        rs = f.read()

    return rs

resolve(self, f_key, tenant, *args, **kwargs) #

生成文件链接

Parameters:

Name Type Description Default
f_key str

存储文件名称

required
tenant Tenant

租户

required
Source code in extension_root/com_longgui_storage_local/__init__.py
def resolve(self, f_key, tenant, *args, **kwargs):
    host = get_app_config().get_frontend_host()
    return f'{host}/api/v1/tenant/{tenant.id}/com_longgui_storage_local/localstorage/{f_key}'

save_file(self, file, f_key, response=None, *args, **kwargs) #

保存文件

Parameters:

Name Type Description Default
file File

文件对象

required
f_key str

存储文件名称

required
Source code in extension_root/com_longgui_storage_local/__init__.py
def save_file(self, file, f_key, response=None, *args, **kwargs):
    print(f_key)
    extension = self.model
    storage_path = extension.profile.get('storage_path','/data')

    p = Path(storage_path) / f_key

    if not p.parent.exists():
        p.parent.mkdir(parents=True)

    with open(p, 'wb') as fp:
        for chunk in file.chunks() if file else response:
            fp.write(chunk)

评论