+
24
-

回答

在 Linux 上防止指定文件夹下的文件被非法篡改,单纯依靠 Python 或 Go 脚本是无法做到完美实时“阻止”的,因为这些脚本运行在用户态,而真正的文件系统操作发生在内核态。

一个拥有足够权限的恶意程序或用户总能绕过你的脚本。

因此,最佳策略是分层防御

操作系统层面的预防:利用 Linux 自身强大的权限和属性系统,这是最坚固的第一道防线。

应用程序层面的监控:使用 Python 或 Go 编写脚本,作为第二道防线,用于实时监控、记录和告警,以便在防御被突破时能立即发现并响应。

下面我将详细解释这两种策略,并提供 Python 和 Go 的代码实现。

第一层:利用Linux自身机制(最有效的预防)

在编写任何代码之前,首先应该使用 Linux 的内置工具。这是最直接、最有效、资源消耗最低的方法。

1. 设置严格的文件权限 (chmod, chown)

确保文件夹和文件的所有者和权限设置正确。例如,如果文件只需要被特定用户(如 www-data)读取,就不要给予写权限。

# 假设目标文件夹是 /var/www/protected_content
# 将所有权交给一个专用的、低权限的用户,例如 myappuser
sudo chown -R myappuser:myappuser /var/www/protected_content

# 对文件夹设置权限:所有者可读写执行,组用户可读执行,其他用户无权限
sudo chmod 750 /var/www/protected_content

# 对文件设置权限:所有者可读写,组用户可读,其他用户无权限
# 如果文件不需要被程序修改,甚至可以设置为 440 (只读)
sudo find /var/www/protected_content -type f -exec chmod 640 {} \;
# 或者更严格的只读
sudo find /var/www/protected_content -type f -exec chmod 440 {} \;
2. 使用不可变属性 (chattr) - 终极武器

这是防止篡改的最强方法。chattr命令可以给文件设置扩展属性。其中 +i (immutable) 属性意味着文件将变得不可变。

一旦文件被设置为 +i 属性,那么:

不能修改文件内容。

不能删除文件。

不能重命名文件。

不能创建硬链接。

甚至 root 用户也无法直接修改它!

如何使用:

# 对单个文件设置不可变属性
sudo chattr +i /var/www/protected_content/config.ini

# 对文件夹下所有文件递归设置不可变属性
sudo chattr -R +i /var/www/protected_content

# 如果需要合法地修改文件,必须先移除该属性
sudo chattr -i /var/www/protected_content/config.ini

优点:

极度安全,由内核强制执行。

几乎无性能开销。

缺点:

合法更新文件也变得繁琐,需要先 chattr -i,修改完后再 chattr +i。

无法防止新文件的创建。对于文件夹,可以设置 +a (append only),只允许追加内容,但不能修改已有内容。

第二层:使用Python/Go进行监控和告警(检测与响应)

当第一层防御可能因管理疏忽被绕过时(比如管理员忘记设置 chattr),监控脚本就派上用场了。它可以检测到任何预期之外的变更。

主要有两种实现思路:

定期轮询和哈希校验:定时扫描文件夹,计算每个文件的哈希值,与之前保存的“白名单”哈希值进行比对。

实时事件监控 (inotify):利用 Linux 内核的 inotify 机制,当文件系统发生变化时,程序会立即收到通知。这是更高效、更实时的方法。

方案一:哈希校验 (Python & Go)

原理

初始化:扫描目标文件夹,为每个文件计算一个加密哈希(如 SHA-256),将 (文件路径, 哈希值) 的映射关系保存到一个安全的地方(基线数据库)。

监控:定时重复步骤1,将新生成的哈希与基线数据库进行比较。

哈希值不匹配 -> 文件被篡改。

文件存在于基线但现在不见了 -> 文件被删除。

新文件出现 -> 文件夹中出现了未知文件。

Python 实现 (哈希校验)
import os
import hashlib
import json
import time

TARGET_DIR = "/path/to/your/protected_folder"
BASELINE_FILE = "/path/to/your/baseline.json"

def calculate_sha256(filepath):
    """计算文件的SHA256哈希值"""
    sha256_hash = hashlib.sha256()
    try:
        with open(filepath, "rb") as f:
            # 逐块读取,防止大文件撑爆内存
            for byte_block in iter(lambda: f.read(4096), b""):
                sha256_hash.update(byte_block)
        return sha256_hash.hexdigest()
    except IOError:
        return None

def create_baseline():
    """创建基线哈希数据库"""
    baseline = {}
    for dirpath, _, filenames in os.walk(TARGET_DIR):
        for filename in filenames:
            filepath = os.path.join(dirpath, filename)
            file_hash = calculate_sha256(filepath)
            if file_hash:
                baseline[filepath] = file_hash

    with open(BASELINE_FILE, "w") as f:
        json.dump(baseline, f, indent=4)
    print(f"Baseline created at {BASELINE_FILE}")

def verify_integrity():
    """校验文件完整性"""
    try:
        with open(BASELINE_FILE, "r") as f:
            baseline = json.load(f)
    except FileNotFoundError:
        print("Baseline file not found. Please create it first.")
        return

    current_state = {}
    for dirpath, _, filenames in os.walk(TARGET_DIR):
        for filename in filenames:
            filepath = os.path.join(dirpath, filename)
            current_state[filepath] = calculate_sha256(filepath)

    # 检查篡改和删除
    for filepath, original_hash in baseline.items():
        if filepath not in current_state:
            print(f"ALERT: File deleted! -> {filepath}")
        elif current_state[filepath] != original_hash:
            print(f"ALERT: File tampered! -> {filepath}")

    # 检查新增文件
    for filepath in current_state:
        if filepath not in baseline:
            print(f"ALERT: New file created! -> {filepath}")

if __name__ == "__main__":
    # 首次运行时,取消下面这行注释来创建基线
    # create_baseline()

    # 启动持续监控
    print(f"Starting monitoring for {TARGET_DIR}...")
    while True:
        verify_integrity()
        time.sleep(10) # 每10秒检查一次
Go 实现 (哈希校验)
package main

import (
    "crypto/sha256"
    "encoding/hex"
    "encoding/json"
    "fmt"
    "io"
    "io/ioutil"
    "log"
    "os"
    "path/filepath"
    "time"
)

const (
    targetDir     = "/path/to/your/protected_folder"
    baselineFile  = "/path/to/your/baseline.json"
)

func calculateSHA256(filePath string) (string, error) {
    file, err := os.Open(filePath)
    if err != nil {
        return "", err
    }
    defer file.Close()

    hash := sha256.New()
    if _, err := io.Copy(hash, file); err != nil {
        return "", err
    }
    return hex.EncodeToString(hash.Sum(nil)), nil
}

func createBaseline() {
    baseline := make(map[string]string)
    err := filepath.Walk(targetDir, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.IsDir() {
            hash, err := calculateSHA256(path)
            if err != nil {
                log.Printf("Could not hash file %s: %v", path, err)
                return nil
            }
            baseline[path] = hash
        }
        return nil
    })

    if err != nil {
        log.Fatalf("Error walking directory: %v", err)
    }

    data, err := json.MarshalIndent(baseline, "", "  ")
    if err != nil {
        log.Fatalf("Could not marshal baseline to JSON: %v", err)
    }

    if err := ioutil.WriteFile(baselineFile, data, 0644); err != nil {
        log.Fatalf("Could not write baseline file: %v", err)
    }
    fmt.Printf("Baseline created at %s\n", baselineFile)
}

func verifyIntegrity() {
    data, err := ioutil.ReadFile(baselineFile)
    if err != nil {
        log.Printf("Baseline file not found. Please create it first.")
        return
    }

    var baseline map[string]string
    if err := json.Unmarshal(data, &baseline); err != nil {
        log.Printf("Could not unmarshal baseline: %v", err)
        return
    }

    currentState := make(map[string]string)
    filepath.Walk(targetDir, func(path string, info os.FileInfo, err error) error {
        if !info.IsDir() {
            hash, _ := calculateSHA256(path)
            currentState[path] = hash
        }
        return nil
    })

    // Check for tampering and deletion
    for path, originalHash := range baseline {
        if currentHash, ok := currentState[path]; !ok {
            fmt.Printf("ALERT: File deleted! -> %s\n", path)
        } else if currentHash != originalHash {
            fmt.Printf("ALERT: File tampered! -> %s\n", path)
        }
    }

    // Check for new files
    for path := range currentState {
        if _, ok := baseline[path]; !ok {
            fmt.Printf("ALERT: New file created! -> %s\n", path)
        }
    }
}

func main() {
    // First time run, uncomment the line below
    // createBaseline()

    log.Printf("Starting monitoring for %s...", targetDir)
    for {
        verifyIntegrity()
        time.Sleep(10 * time.Second)
    }
}

方案二:实时事件监控 (inotify)

这种方法更优越,因为它几乎是瞬时的,并且资源消耗更低(不需要反复扫描整个目录)。

Python 实现 (watchdog库)

watchdog 是一个封装了 inotify 的优秀第三方库。

pip install watchdog
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

TARGET_DIR = "/path/to/your/protected_folder"

class TamperHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if not event.is_directory:
            print(f"ALERT: File modified! -> {event.src_path}")

    def on_created(self, event):
        print(f"ALERT: New item created! -> {event.src_path}")

    def on_deleted(self, event):
        print(f"ALERT: Item deleted! -> {event.src_path}")

    def on_moved(self, event):
        print(f"ALERT: Item moved! from {event.src_path} to {event.dest_path}")


if __name__ == "__main__":
    event_handler = TamperHandler()
    observer = Observer()
    # 递归监控所有子目录
    observer.schedule(event_handler, TARGET_DIR, recursive=True)

    print(f"Starting real-time monitoring for {TARGET_DIR}...")
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
Go 实现 (fsnotify库)

fsnotify 是 Go 生态中用于文件系统通知的事实标准库。

go get github.com/fsnotify/fsnotify
package main

import (
    "log"
    "os"
    "path/filepath"

    "github.com/fsnotify/fsnotify"
)

const targetDir = "/path/to/your/protected_folder"

func main() {
    watcher, err := fsnotify.NewWatcher()
    if err != nil {
        log.Fatal(err)
    }
    defer watcher.Close()

    done := make(chan bool)

    go func() {
        for {
            select {
            case event, ok := <-watcher.Events:
                if !ok {
                    return
                }
                // fsnotify.Write: 写入或修改
                if event.Op&fsnotify.Write == fsnotify.Write {
                    log.Println("ALERT: File modified! ->", event.Name)
                }
                // fsnotify.Create: 创建
                if event.Op&fsnotify.Create == fsnotify.Create {
                    log.Println("ALERT: Item created! ->", event.Name)
                    // 如果创建的是目录,需要将其也加入监控列表
                    info, err := os.Stat(event.Name)
                    if err == nil && info.IsDir() {
                        watcher.Add(event.Name)
                        log.Println("Added new directory to watch list:", event.Name)
                    }
                }
                // fsnotify.Remove: 删除
                if event.Op&fsnotify.Remove == fsnotify.Remove {
                    log.Println("ALERT: Item deleted! ->", event.Name)
                }
                // fsnotify.Rename: 重命名 (通常表现为旧文件的删除)
                if event.Op&fsnotify.Rename == fsnotify.Rename {
                    log.Println("ALERT: Item renamed! ->", event.Name)
                }

            case err, ok := <-watcher.Errors:
                if !ok {
                    return
                }
                log.Println("ERROR:", err)
            }
        }
    }()

    log.Printf("Starting real-time monitoring for %s and its subdirectories...", targetDir)

    // 递归添加所有子目录到监控列表
    err = filepath.Walk(targetDir, func(path string, info os.FileInfo, err error) error {
        if info.IsDir() {
            return watcher.Add(path)
        }
        return nil
    })

    if err != nil {
        log.Fatal("Error adding directories to watcher:", err)
    }

    <-done
}

总结与最佳实践

首选 chattr +i:对于不需要变动的重要配置文件、核心代码文件等,chattr +i 是最简单、最安全的预防手段。

组合使用:将 chattr 和 inotify 监控脚本结合。chattr 负责核心防护,inotify 脚本负责监控整个文件夹,特别是检测非法新文件的创建(chattr 无法阻止)以及对合法修改流程的审计。

安全基线和脚本:如果你使用哈希校验,务必保护好你的基线文件 (baseline.json)!如果攻击者能修改它,你的校验就毫无意义。同样,监控脚本本身也应该被保护,防止被攻击者停止或修改。

告警机制:上述脚本只是打印到控制台。在实际应用中,你应该将告警信息通过邮件、Webhook(如 Slack, DingTalk)、Syslog 等方式发送给管理员,以便及时响应。

专业工具:对于企业级环境,可以考虑使用专业的HIDS(主机入侵检测系统),如 AIDE (Advanced Intrusion Detection Environment) 或 Tripwire,它们是专门为此目的设计的、更成熟的哈希校验工具。另外,基于eBPF的工具如 Falco 也能提供强大的实时监控能力。

网友回复

我知道答案,我要回答