在 Linux 上防止指定文件夹下的文件被非法篡改,单纯依靠 Python 或 Go 脚本是无法做到完美实时“阻止”的,因为这些脚本运行在用户态,而真正的文件系统操作发生在内核态。
一个拥有足够权限的恶意程序或用户总能绕过你的脚本。
因此,最佳策略是分层防御:
操作系统层面的预防:利用 Linux 自身强大的权限和属性系统,这是最坚固的第一道防线。
应用程序层面的监控:使用 Python 或 Go 编写脚本,作为第二道防线,用于实时监控、记录和告警,以便在防御被突破时能立即发现并响应。
下面我将详细解释这两种策略,并提供 Python 和 Go 的代码实现。
第一层:利用Linux自身机制(最有效的预防)
在编写任何代码之前,首先应该使用 Linux 的内置工具。这是最直接、最有效、资源消耗最低的方法。
1. 设置严格的文件权限 (chmod, chown)确保文件夹和文件的所有者和权限设置正确。例如,如果文件只需要被特定用户(如 www-data)读取,就不要给予写权限。
# 假设目标文件夹是 /var/www/protected_content # 将所有权交给一个专用的、低权限的用户,例如 myappuser sudo chown -R myappuser:myappuser /var/www/protected_content # 对文件夹设置权限:所有者可读写执行,组用户可读执行,其他用户无权限 sudo chmod 750 /var/www/protected_content # 对文件设置权限:所有者可读写,组用户可读,其他用户无权限 # 如果文件不需要被程序修改,甚至可以设置为 440 (只读) sudo find /var/www/protected_content -type f -exec chmod 640 {} \; # 或者更严格的只读 sudo find /var/www/protected_content -type f -exec chmod 440 {} \;2. 使用不可变属性 (chattr) - 终极武器
这是防止篡改的最强方法。chattr命令可以给文件设置扩展属性。其中 +i (immutable) 属性意味着文件将变得不可变。
一旦文件被设置为 +i 属性,那么:
不能修改文件内容。
不能删除文件。
不能重命名文件。
不能创建硬链接。
甚至 root 用户也无法直接修改它!
如何使用:
# 对单个文件设置不可变属性 sudo chattr +i /var/www/protected_content/config.ini # 对文件夹下所有文件递归设置不可变属性 sudo chattr -R +i /var/www/protected_content # 如果需要合法地修改文件,必须先移除该属性 sudo chattr -i /var/www/protected_content/config.ini
优点:
极度安全,由内核强制执行。
几乎无性能开销。
缺点:
合法更新文件也变得繁琐,需要先 chattr -i,修改完后再 chattr +i。
无法防止新文件的创建。对于文件夹,可以设置 +a (append only),只允许追加内容,但不能修改已有内容。
第二层:使用Python/Go进行监控和告警(检测与响应)
当第一层防御可能因管理疏忽被绕过时(比如管理员忘记设置 chattr),监控脚本就派上用场了。它可以检测到任何预期之外的变更。
主要有两种实现思路:
定期轮询和哈希校验:定时扫描文件夹,计算每个文件的哈希值,与之前保存的“白名单”哈希值进行比对。
实时事件监控 (inotify):利用 Linux 内核的 inotify 机制,当文件系统发生变化时,程序会立即收到通知。这是更高效、更实时的方法。
方案一:哈希校验 (Python & Go)原理:
初始化:扫描目标文件夹,为每个文件计算一个加密哈希(如 SHA-256),将 (文件路径, 哈希值) 的映射关系保存到一个安全的地方(基线数据库)。
监控:定时重复步骤1,将新生成的哈希与基线数据库进行比较。
哈希值不匹配 -> 文件被篡改。
文件存在于基线但现在不见了 -> 文件被删除。
新文件出现 -> 文件夹中出现了未知文件。
Python 实现 (哈希校验)import os import hashlib import json import time TARGET_DIR = "/path/to/your/protected_folder" BASELINE_FILE = "/path/to/your/baseline.json" def calculate_sha256(filepath): """计算文件的SHA256哈希值""" sha256_hash = hashlib.sha256() try: with open(filepath, "rb") as f: # 逐块读取,防止大文件撑爆内存 for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() except IOError: return None def create_baseline(): """创建基线哈希数据库""" baseline = {} for dirpath, _, filenames in os.walk(TARGET_DIR): for filename in filenames: filepath = os.path.join(dirpath, filename) file_hash = calculate_sha256(filepath) if file_hash: baseline[filepath] = file_hash with open(BASELINE_FILE, "w") as f: json.dump(baseline, f, indent=4) print(f"Baseline created at {BASELINE_FILE}") def verify_integrity(): """校验文件完整性""" try: with open(BASELINE_FILE, "r") as f: baseline = json.load(f) except FileNotFoundError: print("Baseline file not found. Please create it first.") return current_state = {} for dirpath, _, filenames in os.walk(TARGET_DIR): for filename in filenames: filepath = os.path.join(dirpath, filename) current_state[filepath] = calculate_sha256(filepath) # 检查篡改和删除 for filepath, original_hash in baseline.items(): if filepath not in current_state: print(f"ALERT: File deleted! -> {filepath}") elif current_state[filepath] != original_hash: print(f"ALERT: File tampered! -> {filepath}") # 检查新增文件 for filepath in current_state: if filepath not in baseline: print(f"ALERT: New file created! -> {filepath}") if __name__ == "__main__": # 首次运行时,取消下面这行注释来创建基线 # create_baseline() # 启动持续监控 print(f"Starting monitoring for {TARGET_DIR}...") while True: verify_integrity() time.sleep(10) # 每10秒检查一次Go 实现 (哈希校验)
package main import ( "crypto/sha256" "encoding/hex" "encoding/json" "fmt" "io" "io/ioutil" "log" "os" "path/filepath" "time" ) const ( targetDir = "/path/to/your/protected_folder" baselineFile = "/path/to/your/baseline.json" ) func calculateSHA256(filePath string) (string, error) { file, err := os.Open(filePath) if err != nil { return "", err } defer file.Close() hash := sha256.New() if _, err := io.Copy(hash, file); err != nil { return "", err } return hex.EncodeToString(hash.Sum(nil)), nil } func createBaseline() { baseline := make(map[string]string) err := filepath.Walk(targetDir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.IsDir() { hash, err := calculateSHA256(path) if err != nil { log.Printf("Could not hash file %s: %v", path, err) return nil } baseline[path] = hash } return nil }) if err != nil { log.Fatalf("Error walking directory: %v", err) } data, err := json.MarshalIndent(baseline, "", " ") if err != nil { log.Fatalf("Could not marshal baseline to JSON: %v", err) } if err := ioutil.WriteFile(baselineFile, data, 0644); err != nil { log.Fatalf("Could not write baseline file: %v", err) } fmt.Printf("Baseline created at %s\n", baselineFile) } func verifyIntegrity() { data, err := ioutil.ReadFile(baselineFile) if err != nil { log.Printf("Baseline file not found. Please create it first.") return } var baseline map[string]string if err := json.Unmarshal(data, &baseline); err != nil { log.Printf("Could not unmarshal baseline: %v", err) return } currentState := make(map[string]string) filepath.Walk(targetDir, func(path string, info os.FileInfo, err error) error { if !info.IsDir() { hash, _ := calculateSHA256(path) currentState[path] = hash } return nil }) // Check for tampering and deletion for path, originalHash := range baseline { if currentHash, ok := currentState[path]; !ok { fmt.Printf("ALERT: File deleted! -> %s\n", path) } else if currentHash != originalHash { fmt.Printf("ALERT: File tampered! -> %s\n", path) } } // Check for new files for path := range currentState { if _, ok := baseline[path]; !ok { fmt.Printf("ALERT: New file created! -> %s\n", path) } } } func main() { // First time run, uncomment the line below // createBaseline() log.Printf("Starting monitoring for %s...", targetDir) for { verifyIntegrity() time.Sleep(10 * time.Second) } }
方案二:实时事件监控 (inotify)
这种方法更优越,因为它几乎是瞬时的,并且资源消耗更低(不需要反复扫描整个目录)。
Python 实现 (watchdog库)watchdog 是一个封装了 inotify 的优秀第三方库。
pip install watchdog
import time from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler TARGET_DIR = "/path/to/your/protected_folder" class TamperHandler(FileSystemEventHandler): def on_modified(self, event): if not event.is_directory: print(f"ALERT: File modified! -> {event.src_path}") def on_created(self, event): print(f"ALERT: New item created! -> {event.src_path}") def on_deleted(self, event): print(f"ALERT: Item deleted! -> {event.src_path}") def on_moved(self, event): print(f"ALERT: Item moved! from {event.src_path} to {event.dest_path}") if __name__ == "__main__": event_handler = TamperHandler() observer = Observer() # 递归监控所有子目录 observer.schedule(event_handler, TARGET_DIR, recursive=True) print(f"Starting real-time monitoring for {TARGET_DIR}...") observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()Go 实现 (fsnotify库)
fsnotify 是 Go 生态中用于文件系统通知的事实标准库。
go get github.com/fsnotify/fsnotify
package main import ( "log" "os" "path/filepath" "github.com/fsnotify/fsnotify" ) const targetDir = "/path/to/your/protected_folder" func main() { watcher, err := fsnotify.NewWatcher() if err != nil { log.Fatal(err) } defer watcher.Close() done := make(chan bool) go func() { for { select { case event, ok := <-watcher.Events: if !ok { return } // fsnotify.Write: 写入或修改 if event.Op&fsnotify.Write == fsnotify.Write { log.Println("ALERT: File modified! ->", event.Name) } // fsnotify.Create: 创建 if event.Op&fsnotify.Create == fsnotify.Create { log.Println("ALERT: Item created! ->", event.Name) // 如果创建的是目录,需要将其也加入监控列表 info, err := os.Stat(event.Name) if err == nil && info.IsDir() { watcher.Add(event.Name) log.Println("Added new directory to watch list:", event.Name) } } // fsnotify.Remove: 删除 if event.Op&fsnotify.Remove == fsnotify.Remove { log.Println("ALERT: Item deleted! ->", event.Name) } // fsnotify.Rename: 重命名 (通常表现为旧文件的删除) if event.Op&fsnotify.Rename == fsnotify.Rename { log.Println("ALERT: Item renamed! ->", event.Name) } case err, ok := <-watcher.Errors: if !ok { return } log.Println("ERROR:", err) } } }() log.Printf("Starting real-time monitoring for %s and its subdirectories...", targetDir) // 递归添加所有子目录到监控列表 err = filepath.Walk(targetDir, func(path string, info os.FileInfo, err error) error { if info.IsDir() { return watcher.Add(path) } return nil }) if err != nil { log.Fatal("Error adding directories to watcher:", err) } <-done }
总结与最佳实践
首选 chattr +i:对于不需要变动的重要配置文件、核心代码文件等,chattr +i 是最简单、最安全的预防手段。
组合使用:将 chattr 和 inotify 监控脚本结合。chattr 负责核心防护,inotify 脚本负责监控整个文件夹,特别是检测非法新文件的创建(chattr 无法阻止)以及对合法修改流程的审计。
安全基线和脚本:如果你使用哈希校验,务必保护好你的基线文件 (baseline.json)!如果攻击者能修改它,你的校验就毫无意义。同样,监控脚本本身也应该被保护,防止被攻击者停止或修改。
告警机制:上述脚本只是打印到控制台。在实际应用中,你应该将告警信息通过邮件、Webhook(如 Slack, DingTalk)、Syslog 等方式发送给管理员,以便及时响应。
专业工具:对于企业级环境,可以考虑使用专业的HIDS(主机入侵检测系统),如 AIDE (Advanced Intrusion Detection Environment) 或 Tripwire,它们是专门为此目的设计的、更成熟的哈希校验工具。另外,基于eBPF的工具如 Falco 也能提供强大的实时监控能力。
网友回复