+
12
-

回答

在Python中实现类似PHP的FastCGI多租户虚拟空间,需要实现目录隔离、权限控制等功能。以下是一个完整的实现方案:

1. 基础FastCGI服务器实现

import os
import sys
import pwd
import grp
import threading
from flup.server.fcgi import WSGIServer
import configparser
from pathlib import Path

class MultiTenantFastCGI:
    def __init__(self, config_file):
        self.config = configparser.ConfigParser()
        self.config.read(config_file)
        self.tenants = {}
        self.load_tenants()

    def load_tenants(self):
        """加载租户配置"""
        for section in self.config.sections():
            if section.startswith('tenant_'):
                tenant_name = section.replace('tenant_', '')
                self.tenants[tenant_name] = {
                    'domain': self.config.get(section, 'domain'),
                    'root_dir': self.config.get(section, 'root_dir'),
                    'uid': self.config.getint(section, 'uid'),
                    'gid': self.config.getint(section, 'gid'),
                    'max_memory': self.config.getint(section, 'max_memory', fallback=128),
                    'max_processes': self.config.getint(section, 'max_processes', fallback=5)
                }

2. 目录隔离和沙箱环境

import resource
import subprocess
import tempfile

class SandboxEnvironment:
    """沙箱环境管理器"""

    def __init__(self, tenant_config):
        self.tenant_config = tenant_config
        self.root_dir = tenant_config['root_dir']
        self.uid = tenant_config['uid']
        self.gid = tenant_config['gid']

    def setup_chroot(self):
        """设置chroot环境"""
        # 创建必要的目录结构
        self._prepare_chroot_env()

        # 切换到租户目录
        os.chdir(self.root_dir)

        # 设置chroot (需要root权限)
        if os.geteuid() == 0:
            os.chroot(self.root_dir)
            os.chdir('/')

            # 降权到租户用户
            os.setgid(self.gid)
            os.setuid(self.uid)

    def _prepare_chroot_env(self):
        """准备chroot环境所需的基础文件"""
        required_dirs = ['/dev', '/proc', '/tmp', '/var/tmp']

        for dir_path in required_dirs:
            full_path = os.path.join(self.root_dir, dir_path.lstrip('/'))
            os.makedirs(full_path, exist_ok=True)

        # 复制必要的系统文件
        self._copy_system_files()

    def _copy_system_files(self):
        """复制必要的系统文件到chroot环境"""
        system_files = [
            '/etc/resolv.conf',
            '/etc/hosts',
            '/etc/nsswitch.conf'
        ]

        etc_dir = os.path.join(self.root_dir, 'etc')
        os.makedirs(etc_dir, exist_ok=True)

        for file_path in system_files:
            if os.path.exists(file_path):
                dest = os.path.join(self.root_dir, file_path.lstrip('/'))
                os.makedirs(os.path.dirname(dest), exist_ok=True)
                subprocess.run(['cp', file_path, dest])

    def set_resource_limits(self):
        """设置资源限制"""
        # 内存限制 (MB转换为字节)
        max_memory = self.tenant_config['max_memory'] * 1024 * 1024
        resource.setrlimit(resource.RLIMIT_AS, (max_memory, max_memory))

        # 进程数限制
        max_processes = self.tenant_config['max_processes']
        resource.setrlimit(resource.RLIMIT_NPROC, (max_processes, max_processes))

        # 文件描述符限制
        resource.setrlimit(resource.RLIMIT_NOFILE, (256, 256))

3. 安全的文件系统操作

import os
import stat
from pathlib import Path

class SecureFileSystem:
    """安全的文件系统操作类"""

    def __init__(self, base_path, uid, gid):
        self.base_path = Path(base_path).resolve()
        self.uid = uid
        self.gid = gid

    def _validate_path(self, path):
        """验证路径是否在允许范围内"""
        try:
            resolved_path = Path(path).resolve()
            # 确保路径在base_path内
            resolved_path.relative_to(self.base_path)
            return resolved_path
        except (ValueError, RuntimeError):
            raise PermissionError(f"Access denied: {path}")

    def opendir(self, path):
        """安全的目录打开操作"""
        safe_path = self._validate_path(path)

        # 检查权限
        if not self._check_permission(safe_path, os.R_OK):
            raise PermissionError(f"Permission denied: {path}")

        if not safe_path.is_dir():
            raise NotADirectoryError(f"Not a directory: {path}")

        return self._list_directory(safe_path)

    def _list_directory(self, path):
        """列出目录内容"""
        items = []
        for item in path.iterdir():
            stat_info = item.stat()
            items.append({
                'name': item.name,
                'type': 'dir' if item.is_dir() else 'file',
                'size': stat_info.st_size,
                'mtime': stat_info.st_mtime,
                'permissions': oct(stat_info.st_mode)[-3:]
            })
        return items

    def _check_permission(self, path, mode):
        """检查文件权限"""
        try:
            stat_info = path.stat()

            # 检查所有者权限
            if stat_info.st_uid == self.uid:
                return bool(stat_info.st_mode & (mode << 6))

            # 检查组权限
            if stat_info.st_gid == self.gid:
                return bool(stat_info.st_mode & (mode << 3))

            # 检查其他用户权限
            return bool(stat_info.st_mode & mode)

        except OSError:
            return False

    def read_file(self, path, max_size=10*1024*1024):
        """安全读取文件"""
        safe_path = self._validate_path(path)

        if not self._check_permission(safe_path, os.R_OK):
            raise PermissionError(f"Permission denied: {path}")

        if safe_path.stat().st_size > max_size:
            raise ValueError(f"File too large: {path}")

        with open(safe_path, 'r') as f:
            return f.read()

    def write_file(self, path, content):
        """安全写入文件"""
        safe_path = self._validate_path(path)
        parent_dir = safe_path.parent

        # 检查父目录写权限
        if not self._check_permission(parent_dir, os.W_OK):
            raise PermissionError(f"Permission denied: {path}")

        with open(safe_path, 'w') as f:
            f.write(content)

        # 设置文件所有者
        os.chown(safe_path, self.uid, self.gid)

4. WSGI应用处理器

class TenantWSGIApp:
    """租户WSGI应用"""

    def __init__(self, tenant_name, tenant_config):
        self.tenant_name = tenant_name
        self.tenant_config = tenant_config
        self.sandbox = SandboxEnvironment(tenant_config)
        self.fs = SecureFileSystem(
            tenant_config['root_dir'],
            tenant_config['uid'],
            tenant_config['gid']
        )

    def __call__(self, environ, start_response):
        """WSGI应用入口"""
        try:
            # 设置沙箱环境
            self.setup_environment()

            # 获取请求路径
            path = environ.get('PATH_INFO', '/')
            method = environ.get('REQUEST_METHOD', 'GET')

            # 处理请求
            if method == 'GET':
                response = self.handle_get(path)
            elif method == 'POST':
                response = self.handle_post(path, environ)
            else:
                response = self.error_response(405, 'Method Not Allowed')

            # 返回响应
            status = response['status']
            headers = response['headers']
            start_response(status, headers)

            return [response['body'].encode('utf-8')]

        except Exception as e:
            return self.handle_error(e, start_response)

    def setup_environment(self):
        """设置执行环境"""
        # 设置资源限制
        self.sandbox.set_resource_limits()

        # 如果有root权限,设置chroot
        if os.geteuid() == 0:
            self.sandbox.setup_chroot()

    def handle_get(self, path):
        """处理GET请求"""
        # 构建实际文件路径
        file_path = os.path.join(self.tenant_config['root_dir'], path.lstrip('/'))

        # 如果是目录,列出内容
        if os.path.isdir(file_path):
            try:
                items = self.fs.opendir(file_path)
                body = self.render_directory_listing(path, items)
                return {
                    'status': '200 OK',
                    'headers': [('Content-Type', 'text/html')],
                    'body': body
                }
            except PermissionError:
                return self.error_response(403, 'Forbidden')

        # 如果是文件,返回内容
        elif os.path.isfile(file_path):
            try:
                content = self.fs.read_file(file_path)
                return {
                    'status': '200 OK',
                    'headers': [('Content-Type', 'text/plain')],
                    'body': content
                }
            except PermissionError:
                return self.error_response(403, 'Forbidden')

        return self.error_response(404, 'Not Found')

    def render_directory_listing(self, path, items):
        """渲染目录列表"""
        html = f"""
        <html>
        <head><title>Directory: {path}</title></head>
        <body>
            <h1>Directory: {path}</h1>
            <ul>
        """

        for item in items:
            name = item['name']
            item_type = item['type']
            if item_type == 'dir':
                html += f'<li><a href="{os.path.join(path, name)}/">[DIR] {name}</a></li>'
            else:
                html += f'<li><a href="{os.path.join(path, name)}">{name}</a> ({item["size"]} bytes)</li>'

        html += """
            </ul>
        </body>
        </html>
        """
        return html

    def error_response(self, code, message):
        """错误响应"""
        return {
            'status': f'{code} {message}',
            'headers': [('Content-Type', 'text/plain')],
            'body': message
        }

5. 主服务器启动

def create_tenant_app(tenant_name, config):
    """为每个租户创建独立的应用实例"""
    tenant_config = config['tenants'][tenant_name]
    return TenantWSGIApp(tenant_name, tenant_config)

def main():
    # 配置文件示例
    config_content = """
    [tenant_user1]
    domain = user1.example.com
    root_dir = /var/www/tenants/user1
    uid = 1001
    gid = 1001
    max_memory = 128
    max_processes = 5

    [tenant_user2]
    domain = user2.example.com
    root_dir = /var/www/tenants/user2
    uid = 1002
    gid = 1002
    max_memory = 256
    max_processes = 10
    """

    # 初始化多租户FastCGI服务器
    server = MultiTenantFastCGI('tenants.conf')

    # 路由处理器
    def application(environ, start_response):
        # 根据域名确定租户
        host = environ.get('HTTP_HOST', '')

        for tenant_name, config in server.tenants.items():
            if config['domain'] == host:
                app = create_tenant_app(tenant_name, server)
                return app(environ, start_response)

        # 默认响应
        start_response('404 Not Found', [('Content-Type', 'text/plain')])
        return [b'Tenant not found']

    # 启动FastCGI服务器
    WSGIServer(application, bindAddress=('127.0.0.1', 9000)).run()

if __name__ == '__main__':
    main()

6. Nginx配置示例

server {
    listen 80;
    server_name *.example.com;

    location / {
        fastcgi_pass 127.0.0.1:9000;
        fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
        fastcgi_param PATH_INFO $fastcgi_path_info;
        include fastcgi_params;
    }
}

这个实现提供了:

目录隔离:通过chroot和路径验证

权限控制:基于UID/GID的权限检查

资源限制:内存、进程数限制

安全性:路径遍历保护、文件大小限制

多租户支持:基于域名的租户识别

使用时需要注意:

需要root权限才能使用chroot

建议使用容器技术(Docker)增强隔离

定期更新和审计安全配置

网友回复

我知道答案,我要回答