如何在 Python 中跟踪日志文件?

2024-12-27 08:47:00
admin
原创
117
摘要:问题描述:我想在 Python 中不阻塞或锁定地使用 tail -F 或类似命令的输出。我在这里找到了一些非常古老的代码来执行此操作,但我想现在一定有更好的方法或库来做同样的事情。有人知道吗?理想情况下,tail.getNewData()每次我需要更多数据时,我都可以调用类似的东西。解决方案 1:非阻塞如果您...

问题描述:

我想在 Python 中不阻塞或锁定地使用 tail -F 或类似命令的输出。我在这里找到了一些非常古老的代码来执行此操作,但我想现在一定有更好的方法或库来做同样的事情。有人知道吗?

理想情况下,tail.getNewData()每次我需要更多数据时,我都可以调用类似的东西。


解决方案 1:

非阻塞

如果您使用的是 Linux(因为 Windows 不支持在文件上调用选择),您可以将子进程模块与选择模块一起使用。

import time
import subprocess
import select

f = subprocess.Popen(['tail','-F',filename],\n        stdout=subprocess.PIPE,stderr=subprocess.PIPE)
p = select.poll()
p.register(f.stdout)

while True:
    if p.poll(1):
        print f.stdout.readline()
    time.sleep(1)

这将轮询输出管道以获取新数据,并在数据可用时将其打印出来。通常,time.sleep(1)print f.stdout.readline()将被替换为有用的代码。

阻塞

您可以使用子流程模块,而无需额外的选择模块调用。

import subprocess
f = subprocess.Popen(['tail','-F',filename],\n        stdout=subprocess.PIPE,stderr=subprocess.PIPE)
while True:
    line = f.stdout.readline()
    print line

这还将在添加新行时打印它们,但它将被阻止,直到尾部程序关闭,可能使用f.kill()

解决方案 2:

使用sh 模块(pip install sh):

from sh import tail
# runs forever
for line in tail("-f", "/var/log/some_log_file.log", _iter=True):
    print(line)

[更新]

由于带有_iter=True 的 sh.tail 是一个生成器,因此您可以:

import sh
tail = sh.tail("-f", "/var/log/some_log_file.log", _iter=True)

然后你可以使用以下命令“获取新数据”:

new_data = tail.next()

请注意,如果尾部缓冲区为空,它将被阻塞,直到有更多数据(从您的问题来看,不清楚您在这种情况下想要做什么)。

[更新]

如果将 -f 替换为 -F,则此方法有效,但在 Python 中,它将被锁定。如果可能的话,我更感兴趣的是拥有一个可以调用的函数,以便在需要时获取新数据。– Eli

容器生成器将尾部调用放置在 while True 循环内并捕获最终的 I/O 异常将具有与 -F 几乎相同的效果。

def tail_F(some_file):
    while True:
        try:
            for line in sh.tail("-f", some_file, _iter=True):
                yield line
        except sh.ErrorReturnCode_1:
            yield None

如果文件无法访问,生成器将返回 None。但是,如果文件可访问,它仍会阻塞,直到有新数据。在这种情况下,我仍然不清楚你想做什么。

Raymond Hettinger 的方法看起来相当不错:

def tail_F(some_file):
    first_call = True
    while True:
        try:
            with open(some_file) as input:
                if first_call:
                    input.seek(0, 2)
                    first_call = False
                latest_data = input.read()
                while True:
                    if '
' not in latest_data:
                        latest_data += input.read()
                        if '
' not in latest_data:
                            yield ''
                            if not os.path.isfile(some_file):
                                break
                            continue
                    latest_lines = latest_data.split('
')
                    if latest_data[-1] != '
':
                        latest_data = latest_lines[-1]
                    else:
                        latest_data = input.read()
                    for line in latest_lines[:-1]:
                        yield line + '
'
        except IOError:
            yield ''

如果文件无法访问或者没有新数据,该生成器将返回“”。

[更新]

每当数据用完时,倒数第二个答案似乎就会绕到文件顶部。– Eli

我认为第二个将在 tail 进程结束时输出最后十行,也就是-f在发生 I/O 错误时。tail --follow --retry在我能想到的大多数类 unix 环境中,行为与此相差无几。

也许如果你更新你的问题来解释你的真正目标是什么(你想要模仿尾部--重试的原因),你会得到更好的答案。

最后一个答案实际上并没有跟随尾部,而只是读取运行时可用的内容。 – Eli

当然,tail 默认会显示最后 10 行...您可以使用 file.seek 将文件指针定位到文件末尾,我将把一个正确的实现留给读者作为练习。

我认为 file.read() 方法比基于子进程的解决方案优雅得多。

解决方案 3:

使用非阻塞 readline() 的纯 Python 解决方案

我正在调整 Ijaz Ahmad Khan 的答案,以便只在完全写入时产生行(行以换行符结尾),从而提供一个没有外部依赖的 Pythonic 解决方案:

import time
from typing import Iterator

def follow(file, sleep_sec=0.1) -> Iterator[str]:
    """ Yield each line from a file as they are written.
    `sleep_sec` is the time to sleep after empty reads. """
    line = ''
    while True:
        tmp = file.readline()
        if tmp is not None and tmp != "":
            line += tmp
            if line.endswith("
"):
                yield line
                line = ''
        elif sleep_sec:
            time.sleep(sleep_sec)


if __name__ == '__main__':
    with open("test.txt", 'r') as file:
        for line in follow(file):
            print(line, end='')

解决方案 4:

事实上,对文件来说,唯一可移植的方式似乎是从文件中读取,如果返回 0 ,则重试(在 之后)。不同平台上的实用程序使用特定于平台的技巧(例如在 BSD 上)来有效地永远跟踪文件,而不需要。tail -f`sleepreadtailkqueuesleep`

因此,仅使用 Python 实现一个好的方案tail -f可能不是一个好主意,因为您必须使用最小公分母实现(而不诉诸特定于平台的黑客)。使用简单的subprocess打开tail -f并在单独的线程中迭代行,您可以轻松地在 Python 中实现非阻塞tail操作。

示例实现:

import threading, Queue, subprocess
tailq = Queue.Queue(maxsize=10) # buffer at most 100 lines

def tail_forever(fn):
    p = subprocess.Popen(["tail", "-f", fn], stdout=subprocess.PIPE)
    while 1:
        line = p.stdout.readline()
        tailq.put(line)
        if not line:
            break

threading.Thread(target=tail_forever, args=(fn,)).start()

print tailq.get() # blocks
print tailq.get_nowait() # throws Queue.Empty if there are no lines to read

解决方案 5:

所有使用 tail -f 的答案都不符合 Python 风格。

这是 Python 方式:(不使用外部工具或库)

def follow(thefile):
     while True:
        line = thefile.readline()
        if not line or not line.endswith('
'):
            time.sleep(0.1)
            continue
        yield line



if __name__ == '__main__':
    logfile = open("run/foo/access-log","r")
    loglines = follow(logfile)
    for line in loglines:
        print(line, end='')

解决方案 6:

所以,这来得有点晚了,但我又遇到了同样的问题,现在有一个更好的解决方案。只需使用pygtail:

Pygtail 读取尚未读取的日志文件行。它甚至可以处理已轮换的日志文件。基于 logcheck 的 logtail2 ( http://logcheck.org )

解决方案 7:

理想情况下,我会有类似 tail.getNewData() 的方法,每次我需要更多数据时都可以调用它

我们已经有一个了,它非常好。每当您需要更多数据时, 只需调用f.read()。它将从上一次读取停止的地方开始读取,并将读取到数据流的末尾:

f = open('somefile.log')
p = 0
while True:
    f.seek(p)
    latest_data = f.read()
    p = f.tell()
    if latest_data:
        print latest_data
        print str(p).center(10).center(80, '=')

要逐行读取,请使用f.readline()。有时,正在读取的文件将以部分读取的行结束。使用f.tell()处理这种情况,查找当前文件位置,并使用f.seek()将文件指针移回未完成行的开头。请参阅此 ActiveState 配方以获取有效代码。

解决方案 8:

您可以使用“tailer”库:https://pypi.python.org/pypi/tailer/

它有一个获取最后几行的选项:

# Get the last 3 lines of the file
tailer.tail(open('test.txt'), 3)
# ['Line 9', 'Line 10', 'Line 11']

它还可以关注文件:

# Follow the file as it grows
for line in tailer.follow(open('test.txt')):
    print line

如果想要类似尾巴的行为,那似乎是一个不错的选择。

解决方案 9:

另一个选择是tailhead提供 Python 版本tailhead实用程序以及可以在您自己的模块中使用的 API 的库。

最初基于tailer模块,其主要优点是能够通过路径跟踪文件,即它可以处理文件重新创建的情况。此外,它还针对各种边缘情况修复了一些错误。

解决方案 10:

如果您使用的是 Linux,您可以按照以下方式在 Python 中实现非阻塞实现。

import subprocess
subprocess.call('xterm -title log -hold -e \"tail -f filename\"&', shell=True, executable='/bin/csh')
print "Done"

解决方案 11:

Python 是“内置电池”的 - 它有一个很好的解决方案:https://pypi.python.org/pypi/pygtail

读取尚未读取的日志文件行。记住上次完成的位置,并从那里继续。

import sys
from pygtail import Pygtail

for line in Pygtail("some.log"):
    sys.stdout.write(line)

解决方案 12:

来自 pypi 应用程序 tailread 的简单 tail 函数

您也可以通过 pip install tailread 使用它

推荐用于大文件的尾部访问。

from io import BufferedReader


def readlines(bytesio, batch_size=1024, keepends=True, **encoding_kwargs):
    '''bytesio: file path or BufferedReader
       batch_size: size to be processed
    '''
    path = None
    
    if isinstance(bytesio, str):
        path = bytesio
        bytesio = open(path, 'rb')
    elif not isinstance(bytesio, BufferedReader):
        raise TypeError('The first argument to readlines must be a file path or a BufferedReader')

    bytesio.seek(0, 2)
    end = bytesio.tell()

    buf = b""
    for p in reversed(range(0, end, batch_size)):
        bytesio.seek(p)
        lines = []
        remain = min(end-p, batch_size)
        while remain > 0:
            line = bytesio.readline()[:remain]
            lines.append(line)
            remain -= len(line)

        cut, *parsed = lines
        for line in reversed(parsed):
            if buf:
                line += buf
                buf = b""
            if encoding_kwargs:
                line = line.decode(**encoding_kwargs)
            yield from reversed(line.splitlines(keepends))
        buf = cut + buf
    
    if path:
        bytesio.close()

    if encoding_kwargs:
        buf = buf.decode(**encoding_kwargs)
    yield from reversed(buf.splitlines(keepends))


for line in readlines('access.log', encoding='utf-8', errors='replace'):
    print(line)
    if 'line 8' in line:
        break

# line 11
# line 10
# line 9
# line 8

解决方案 13:

您还可以使用“AWK”命令。

更多信息请参见: http: //www.unix.com/shell-programming-scripting/41734-how-print-specific-lines-awk.html

awk 可用于尾随最后一行、最后几行或文件中的任意行。

这可以从 python 中调用。

解决方案 14:

# -*- coding:utf-8 -*-
import sys
import time


class Tail():
    def __init__(self, file_name, callback=sys.stdout.write):
        self.file_name = file_name
        self.callback = callback

    def follow(self, n=10):
        try:
            # 打开文件
            with open(self.file_name, 'r', encoding='UTF-8') as f:
            # with open(self.file_name,'rb') as f:
                self._file = f
                self._file.seek(0, 2)
                # 存储文件的字符长度
                self.file_length = self._file.tell()
                # 打印最后10行
                self.showLastLine(n)
                # 持续读文件 打印增量
                while True:
                    line = self._file.readline()
                    if line:
                        self.callback(line)
                    time.sleep(1)
        except Exception as e:
            print('打开文件失败,囧,看看文件是不是不存在,或者权限有问题')
            print(e)

    def showLastLine(self, n):
        # 一行大概100个吧 这个数改成1或者1000都行
        len_line = 100
        # n默认是10,也可以follow的参数传进来
        read_len = len_line * n
        # 用last_lines存储最后要处理的内容
        while True:
            # 如果要读取的1000个字符,大于之前存储的文件长度
            # 读完文件,直接break
            if read_len > self.file_length:
                self._file.seek(0)
                last_lines = self._file.read().split('
')[-n:]
                break
            # 先读1000个 然后判断1000个字符里换行符的数量
            self._file.seek(-read_len, 2)
            last_words = self._file.read(read_len)
            # count是换行符的数量
            count = last_words.count('
')

            if count >= n:
                # 换行符数量大于10 很好处理,直接读取
                last_lines = last_words.split('
')[-n:]
                break
            # 换行符不够10个
            else:
                # break
                # 不够十行
                # 如果一个换行符也没有,那么我们就认为一行大概是100个
                if count == 0:

                    len_perline = read_len
                # 如果有4个换行符,我们认为每行大概有250个字符
                else:
                    len_perline = read_len / count
                # 要读取的长度变为2500,继续重新判断
                read_len = len_perline * n
        for line in last_lines:
            self.callback(line + '
')


if __name__ == '__main__':
    py_tail = Tail('test.txt')
    py_tail.follow(1)
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1590  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1361  
  信创产品在政府采购中的占比分析随着信息技术的飞速发展以及国家对信息安全重视程度的不断提高,信创产业应运而生并迅速崛起。信创,即信息技术应用创新,旨在实现信息技术领域的自主可控,减少对国外技术的依赖,保障国家信息安全。政府采购作为推动信创产业发展的重要力量,其对信创产品的采购占比情况备受关注。这不仅关系到信创产业的发展前...
信创和国产化的区别   18  
  信创,即信息技术应用创新产业,旨在实现信息技术领域的自主可控,摆脱对国外技术的依赖。近年来,国货国用信创发展势头迅猛,在诸多领域取得了显著成果。这一发展趋势对科技创新产生了深远的推动作用,不仅提升了我国在信息技术领域的自主创新能力,还为经济社会的数字化转型提供了坚实支撑。信创推动核心技术突破信创产业的发展促使企业和科研...
信创工作   18  
  信创技术,即信息技术应用创新产业,旨在实现信息技术领域的自主可控与安全可靠。近年来,信创技术发展迅猛,对中小企业产生了深远的影响,带来了诸多不可忽视的价值。在数字化转型的浪潮中,中小企业面临着激烈的市场竞争和复杂多变的环境,信创技术的出现为它们提供了新的发展机遇和支撑。信创技术对中小企业的影响技术架构变革信创技术促使中...
信创国产化   19  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用