非阻塞控制台输入?

2024-12-13 08:37:00
admin
原创
78
摘要:问题描述:我正在尝试用 Python 制作一个简单的 IRC 客户端(作为我学习该语言时的一个项目)。我有一个循环,用于接收和解析 IRC 服务器发送给我的内容,但是如果我用它raw_input来输入东西,它就会停止循环,直到我输入一些东西(显然)。我怎样才能输入一些内容而不停止循环?(我认为我不需要发布代码...

问题描述:

我正在尝试用 Python 制作一个简单的 IRC 客户端(作为我学习该语言时的一个项目)。

我有一个循环,用于接收和解析 IRC 服务器发送给我的内容,但是如果我用它raw_input来输入东西,它就会停止循环,直到我输入一些东西(显然)。

我怎样才能输入一些内容而不停止循环?

(我认为我不需要发布代码,我只是想在不while 1:停止循环的情况下输入一些内容。)

我在 Windows 上。


解决方案 1:

对于 Windows,仅限控制台,使用msvcrt模块:

import msvcrt

num = 0
done = False
while not done:
    print(num)
    num += 1

    if msvcrt.kbhit():
        print "you pressed",msvcrt.getch(),"so now i will quit"
        done = True

对于Linux,本文介绍了以下解决方案,它需要termios模块:

import sys
import select
import tty
import termios

def isData():
    return select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], [])

old_settings = termios.tcgetattr(sys.stdin)
try:
    tty.setcbreak(sys.stdin.fileno())

    i = 0
    while 1:
        print(i)
        i += 1

        if isData():
            c = sys.stdin.read(1)
            if c == 'x1b':         # x1b is ESC
                break

finally:
    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)

对于跨平台,或者如果您也想要 GUI,您可以使用 Pygame:

import pygame
from pygame.locals import *

def display(str):
    text = font.render(str, True, (255, 255, 255), (159, 182, 205))
    textRect = text.get_rect()
    textRect.centerx = screen.get_rect().centerx
    textRect.centery = screen.get_rect().centery

    screen.blit(text, textRect)
    pygame.display.update()

pygame.init()
screen = pygame.display.set_mode( (640,480) )
pygame.display.set_caption('Python numbers')
screen.fill((159, 182, 205))

font = pygame.font.Font(None, 17)

num = 0
done = False
while not done:
    display( str(num) )
    num += 1

    pygame.event.pump()
    keys = pygame.key.get_pressed()
    if keys[K_ESCAPE]:
        done = True

解决方案 2:

这是我见过的最棒的解决方案1。粘贴在此处以防链接失效:

#!/usr/bin/env python
'''
A Python class implementing KBHIT, the standard keyboard-interrupt poller.
Works transparently on Windows and Posix (Linux, Mac OS X).  Doesn't work
with IDLE.

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as 
published by the Free Software Foundation, either version 3 of the 
License, or (at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

'''

import os

# Windows
if os.name == 'nt':
    import msvcrt

# Posix (Linux, OS X)
else:
    import sys
    import termios
    import atexit
    from select import select


class KBHit:

    def __init__(self):
        '''Creates a KBHit object that you can call to do various keyboard things.
        '''

        if os.name == 'nt':
            pass

        else:

            # Save the terminal settings
            self.fd = sys.stdin.fileno()
            self.new_term = termios.tcgetattr(self.fd)
            self.old_term = termios.tcgetattr(self.fd)

            # New terminal setting unbuffered
            self.new_term[3] = (self.new_term[3] & ~termios.ICANON & ~termios.ECHO)
            termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term)

            # Support normal-terminal reset at exit
            atexit.register(self.set_normal_term)


    def set_normal_term(self):
        ''' Resets to normal terminal.  On Windows this is a no-op.
        '''

        if os.name == 'nt':
            pass

        else:
            termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_term)


    def getch(self):
        ''' Returns a keyboard character after kbhit() has been called.
            Should not be called in the same program as getarrow().
        '''

        s = ''

        if os.name == 'nt':
            return msvcrt.getch().decode('utf-8')

        else:
            return sys.stdin.read(1)


    def getarrow(self):
        ''' Returns an arrow-key code after kbhit() has been called. Codes are
        0 : up
        1 : right
        2 : down
        3 : left
        Should not be called in the same program as getch().
        '''

        if os.name == 'nt':
            msvcrt.getch() # skip 0xE0
            c = msvcrt.getch()
            vals = [72, 77, 80, 75]

        else:
            c = sys.stdin.read(3)[2]
            vals = [65, 67, 66, 68]

        return vals.index(ord(c.decode('utf-8')))


    def kbhit(self):
        ''' Returns True if keyboard character was hit, False otherwise.
        '''
        if os.name == 'nt':
            return msvcrt.kbhit()

        else:
            dr,dw,de = select([sys.stdin], [], [], 0)
            return dr != []


# Test    
if __name__ == "__main__":

    kb = KBHit()

    print('Hit any key, or ESC to exit')

    while True:

        if kb.kbhit():
            c = kb.getch()
            if ord(c) == 27: # ESC
                break
            print(c)

    kb.set_normal_term()

1由Simon D. Levy
制作,是他编写的软件汇编的一部分,并根据Gnu 宽通用公共许可证发布。

解决方案 3:

我最喜欢获取非阻塞输入的方法是在线程中使用 python input():

import threading

class KeyboardThread(threading.Thread):

    def __init__(self, input_cbk = None, name='keyboard-input-thread'):
        self.input_cbk = input_cbk
        super(KeyboardThread, self).__init__(name=name, daemon=True)
        self.start()

    def run(self):
        while True:
            self.input_cbk(input()) #waits to get input + Return

showcounter = 0 #something to demonstrate the change

def my_callback(inp):
    #evaluate the keyboard input
    print('You Entered:', inp, ' Counter is at:', showcounter)

#start the Keyboard thread
kthread = KeyboardThread(my_callback)

while True:
    #the normal program executes without blocking. here just counting up
    showcounter += 1

独立于操作系统,仅内部库,支持多字符输入

解决方案 4:

这里有一个在 Linux 和 Windows 下使用单独线程运行的解决方案:

import sys
import threading
import time
import Queue

def add_input(input_queue):
    while True:
        input_queue.put(sys.stdin.read(1))

def foobar():
    input_queue = Queue.Queue()

    input_thread = threading.Thread(target=add_input, args=(input_queue,))
    input_thread.daemon = True
    input_thread.start()

    last_update = time.time()
    while True:

        if time.time()-last_update>0.5:
            sys.stdout.write(".")
            last_update = time.time()

        if not input_queue.empty():
            print "
input:", input_queue.get()

foobar()

解决方案 5:

在 Linux 上,这里是对 mizipzor 代码的重构,可以使这个过程变得更容易一些,以防您必须在多个地方使用此代码。

import sys
import select
import tty
import termios

class NonBlockingConsole(object):

    def __enter__(self):
        self.old_settings = termios.tcgetattr(sys.stdin)
        tty.setcbreak(sys.stdin.fileno())
        return self

    def __exit__(self, type, value, traceback):
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_settings)


    def get_data(self):
        if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
            return sys.stdin.read(1)
        return False

使用方法如下:此代码将打印一个计数器,该计数器不断增长,直到您按下 ESC 键。

with NonBlockingConsole() as nbc:
    i = 0
    while 1:
        print i
        i += 1
        if nbc.get_data() == 'x1b':  # x1b is ESC
            break

解决方案 6:

我认为 curses 库可以提供帮助。

import curses
import datetime

stdscr = curses.initscr()
curses.noecho()
stdscr.nodelay(1) # set getch() non-blocking

stdscr.addstr(0,0,"Press \"p\" to show count, \"q\" to exit...")
line = 1
try:
    while 1:
        c = stdscr.getch()
        if c == ord('p'):
            stdscr.addstr(line,0,"Some text here")
            line += 1
        elif c == ord('q'): break

        """
        Do more things
        """

finally:
    curses.endwin()

解决方案 7:

....回到最初的问题...

我也在学习 Python,这让我花了很多时间阅读文档和示例,也花了很多脑力...但我认为我找到了一个简单、简短且兼容的解决方案...只需使用输入、列表和线程

'''
what i thought:
- input() in another thread
- that were filling a global strings list
- strings are being popped in the main thread
'''

import threading

consoleBuffer = []

def consoleInput(myBuffer):
  while True:
    myBuffer.append(input())
 
threading.Thread(target=consoleInput, args=(consoleBuffer,), daemon=True).start() # start the thread

import time # just to demonstrate non blocking parallel processing

while True:
  time.sleep(2) # avoid 100% cpu
  print(time.time()) # just to demonstrate non blocking parallel processing
  while consoleBuffer:
    print(repr(consoleBuffer.pop(0)))

直到这是我找到的最简单和兼容的方法,请注意默认情况下 stdin stdout 和 stderr 共享同一个终端,因此如果在您输入时控制台上打印了某些内容,则输入的“本地回显”可能看起来不一致,但是按下回车键后,输入的字符串会被很好地接收...如果您不想要/喜欢这种行为,请找到一种方法来分离输入/输出区域,如重定向,或尝试其他解决方案,如 curses、tkinter、pygame 等。

奖励:ctrl-c按键可以轻松处理

try:
  # do whatever
except KeyboardInterrupt:
  print('cancelled by user') or exit() # overload

解决方案 8:

如果您只想从循环中“退出”,您可以拦截 Ctrl-C 信号。

这是跨平台的并且非常简单!

import signal
import sys

def signal_handler(sig, frame):
    print('You pressed Ctrl+C!')
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
while True:
    # do your work here

解决方案 9:

我会按照 Mickey Chan 所说的去做,但我会用它unicurses来代替普通的诅咒。
Unicurses是通用的(适用于所有或至少几乎所有操作系统)

解决方案 10:

使用 python3.3 及以上版本,您可以使用asyncio此答案中提到的模块。不过,您必须重构代码才能使用asyncio
使用 python asyncio.create_server 实例提示用户输入

解决方案 11:

由于我发现上面的一个答案很有帮助,所以这里有一个类似方法的示例。此代码在输入时创建了节拍器效果。

不同之处在于此代码使用闭包而不是类,这对我来说感觉更直接一些。此示例还包含一个标志,用于通过 终止线程my_thread.stop = True,但不使用全局变量。我通过(滥用)利用 Python 函数是对象这一事实来实现这一点,因此甚至可以从其内部对其进行 monkey-patched。

注意:停止线程时应谨慎。如果您的线程中有需要某种清理过程的数据,或者该线程产生了自己的线程,则此方法将毫不留情地终止这些进程。

# Begin metronome sound while accepting input.
# After pressing enter, turn off the metronome sound.
# Press enter again to restart the process.

import threading
import time
import winsound  # Only on Windows

beat_length = 1  # Metronome speed


def beat_thread():
    beat_thread.stop = False  # Monkey-patched flag
    frequency, duration = 2500, 10
    def run():  # Closure
        while not beat_thread.stop:  # Run until flag is True
            winsound.Beep(frequency, duration)
            time.sleep(beat_length - duration/1000)
    threading.Thread(target=run).start()


while True:
    beat_thread()
    input("Input with metronome. Enter to finish.
")
    beat_thread.stop = True  # Flip monkey-patched flag
    input("Metronome paused. Enter to continue.

")

解决方案 12:

以下是围绕上述解决方案之一的类包装器:

#!/usr/bin/env python3

import threading

import queue

class NonBlockingInput:

    def __init__(self, exit_condition):
        self.exit_condition = exit_condition
        self.input_queue = queue.Queue()
        self.input_thread = threading.Thread(target=self.read_kbd_input, args=(), daemon=True)
        self.input_thread.start()

    def read_kbd_input(self):
        done_queueing_input = False
        while not done_queueing_input:
            console_input = input()
            self.input_queue.put(console_input)
            if console_input.strip() == self.exit_condition:
                done_queueing_input = True

    def input_queued(self):
        return_value = False
        if self.input_queue.qsize() > 0:
            return_value = True
        return return_value

    def input_get(self):
        return_value = ""
        if self.input_queue.qsize() > 0:
            return_value = self.input_queue.get()
        return return_value

if __name__ == '__main__':

    NON_BLOCK_INPUT = NonBlockingInput(exit_condition='quit')

    DONE_PROCESSING = False
    INPUT_STR = ""
    while not DONE_PROCESSING:
        if NON_BLOCK_INPUT.input_queued():
            INPUT_STR = NON_BLOCK_INPUT.input_get()
            if INPUT_STR.strip() == "quit":
                DONE_PROCESSING = True
            else:
                print("{}".format(INPUT_STR))

解决方案 13:

下面的示例确实允许在 Windows(仅在 Windows 10 下测试)和 Linux 下从 stdin 进行非阻塞读取,而无需外部依赖项或使用线程。它适用于复制粘贴的文本,它禁用 ECHO,因此它可以用于某种自定义 UI 并使用循环,因此可以轻松处理输入的任何内容。

考虑到上述情况,该示例适用于交互式 TTY,而不是管道输入。

#!/usr/bin/env python3
import sys

if(sys.platform == "win32"):
    import msvcrt
    import ctypes
    from ctypes import wintypes
    kernel32 = ctypes.windll.kernel32
    oldStdinMode = ctypes.wintypes.DWORD()
    # Windows standard handle -10 refers to stdin
    kernel32.GetConsoleMode(kernel32.GetStdHandle(-10), ctypes.byref(oldStdinMode))
    # Disable ECHO and line-mode
    # https://learn.microsoft.com/en-us/windows/console/setconsolemode
    kernel32.SetConsoleMode(kernel32.GetStdHandle(-10), 0)
else:
    # POSIX uses termios
    import select, termios, tty
    oldStdinMode = termios.tcgetattr(sys.stdin)
    _ = termios.tcgetattr(sys.stdin)
    # Disable ECHO and line-mode
    _[3] = _[3] & ~(termios.ECHO | termios.ICANON)
    # Don't block on stdin.read()
    _[6][termios.VMIN] = 0
    _[6][termios.VTIME] = 0
    termios.tcsetattr(sys.stdin, termios.TCSAFLUSH, _)

def readStdin():
    if(sys.platform == "win32"):
        return msvcrt.getwch() if(msvcrt.kbhit()) else ""
    else:
        return sys.stdin.read(1)

def flushStdin():
    if(sys.platform == "win32"):
        kernel32.FlushConsoleInputBuffer(kernel32.GetStdHandle(-10))
    else:
        termios.tcflush(sys.stdin, termios.TCIFLUSH)

try:
    userInput = ""
    print("Type something: ", end = "", flush = True)
    flushStdin()
    while 1:
        peek = readStdin()
        if(len(peek) > 0):
            # Stop input on NUL, Ctrl+C, ESC, carriage return, newline, backspace, EOF, EOT
            if(peek not in ["", "", "x1b", "
", "
", "", "x1a", ""]):
                userInput += peek
                # This is just to show the user what they typed.
                # Can be skipped, if one doesn't need this.
                sys.stdout.write(peek)
                sys.stdout.flush()
            else:
                break
    flushStdin()
    print(f"
userInput length: {len(userInput)}, contents: \"{userInput}\"")
finally:
    if(sys.platform == "win32"):
        kernel32.SetConsoleMode(kernel32.GetStdHandle(-10), oldStdinMode)
    else:
        termios.tcsetattr(sys.stdin, termios.TCSAFLUSH, oldStdinMode)

解决方案 14:

我正在使用 Linux 编写一个程序,该程序具有更大的主循环,需要定期更新,但也需要以非阻塞方式读取字符。但重置显示器也会丢失输入缓冲区。这是我想出的解决方案。每次屏幕更新后,它都会将终端设置为非阻塞,等待主循环通过,然后解释 stdin。之后,终端将重置为原始设置。

#!/usr/bin/python3
import sys, select, os, tty, termios, time

i = 0
l = True
oldtty = termios.tcgetattr(sys.stdin)
stdin_no = sys.stdin.fileno()

while l:
    os.system('clear')
    print("I'm doing stuff. Press a 'q' to stop me!")
    print(i)
    tty.setcbreak(stdin_no)
    time.sleep(0.5)
    if sys.stdin in select.select([sys.stdin], [], [], 0.0)[0]:
        line = sys.stdin.read(1)
        print (line, len(line))
        
        if "q" in line:
            l = False
        else: 
            pass
    termios.tcsetattr(stdin_no, termios.TCSADRAIN, oldtty)
    i += 1


解决方案 15:

marco 的解决方案是正确的想法,但我决定将其简化为尽可能少的代码,而不使用任何类。它还实际上向您展示了如何使用队列库获取用户输入,而不仅仅是打印它:

import time, threading, queue


def collect(que):
    msg = input()
    que.put(msg)

que = queue.Queue()
thread = threading.Thread(target=collect, args=[que])
thread.start()

while thread.is_alive():
    time.sleep(1)
    print("The main thread continues while we wait for you...")

msg = que.get()
print('You typed:', msg)

在这个例子中,主线程无限期地继续(处理数据或其他),同时定期检查用户是否在生成的线程中输入了任何数据。当发生这种情况时,它会返回用户输入。

我已经在自己的脚本中成功地使用了这个想法来创建调试器,在主循环的任何时候我都可以输入“打印变量名称”,它会实时地给我提供值而不会停止。

相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1043  
  IPD(Integrated Product Development,集成产品开发)是一种系统化的产品开发方法论,旨在通过跨职能团队的协作,优化产品开发的效率和质量。IPD流程强调从市场需求出发,通过并行工程、跨部门协作和阶段性评审,确保产品从概念到上市的每个环节都高效且可控。随着敏捷开发方法的普及,越来越多的企业开始...
华为IPD流程   41  
  随着企业产品开发复杂度的提升以及市场需求的快速变化,传统的产品开发模式逐渐显现出局限性。集成产品开发(IPD)流程与敏捷开发(Agile Development)作为两种主流的开发方法论,分别从系统化管理和快速响应需求的角度为企业提供了解决方案。然而,单独使用其中一种方法往往无法完全满足企业在效率、质量和创新上的多重需...
华为IPD流程   35  
  华为IPD(Integrated Product Development,集成产品开发)流程是华为公司成功的关键因素之一。它不仅帮助华为在技术上实现了快速创新,还通过市场导向确保了产品的商业成功。IPD流程通过整合技术与市场双驱动,实现了从需求定义到产品交付的全生命周期管理。这种模式不仅提高了产品的开发效率,还降低了市...
IPD流程中PDCP是什么意思   32  
  在研发领域,集成产品开发(IPD)流程已经成为企业提升创新效率和市场竞争力的重要手段。然而,资源分配的不合理往往是制约IPD流程效率的关键因素之一。无论是人力资源、财务资源还是技术资源,如何高效分配直接关系到项目的成功与否。优化资源分配不仅能够缩短产品开发周期,还能降低研发成本,提升产品的市场竞争力。因此,掌握资源分配...
IPD流程中CDCP   34  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用