Subprocess.Popen: clonación de stdout y stderr tanto en terminal como en variables

Resuelto Łukasz Zdun asked hace 11 años • 4 respuestas

¿Es posible modificar el código siguiente para imprimir desde 'stdout' y 'stderr'?

  • impreso en el terminal (en tiempo real),
  • y finalmente almacenado en variables outs y errs ?

El código:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import subprocess

def run_cmd(command, cwd=None):
    p = subprocess.Popen(command, cwd=cwd, shell=False,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)
    outs, errs = p.communicate()
    rc = p.returncode
    outs = outs.decode('utf-8')
    errs = errs.decode('utf-8')

    return (rc, (outs, errs))

Gracias a @unutbu, agradecimiento especial a @jf-sebastian, función final:

#!/usr/bin/python3
# -*- coding: utf-8 -*-


import sys
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread


def read_output(pipe, funcs):
    for line in iter(pipe.readline, b''):
        for func in funcs:
            func(line.decode('utf-8'))
    pipe.close()


def write_output(get):
    for line in iter(get, None):
        sys.stdout.write(line)


def run_cmd(command, cwd=None, passthrough=True):
    outs, errs = None, None

    proc = Popen(
        command,
        cwd=cwd,
        shell=False,
        close_fds=True,
        stdout=PIPE,
        stderr=PIPE,
        bufsize=1
        )

    if passthrough:

        outs, errs = [], []

        q = Queue()

        stdout_thread = Thread(
            target=read_output, args=(proc.stdout, [q.put, outs.append])
            )

        stderr_thread = Thread(
            target=read_output, args=(proc.stderr, [q.put, errs.append])
            )

        writer_thread = Thread(
            target=write_output, args=(q.get,)
            )

        for t in (stdout_thread, stderr_thread, writer_thread):
            t.daemon = True
            t.start()

        proc.wait()

        for t in (stdout_thread, stderr_thread):
            t.join()

        q.put(None)

        outs = ' '.join(outs)
        errs = ' '.join(errs)

    else:

        outs, errs = proc.communicate()
        outs = '' if outs == None else outs.decode('utf-8')
        errs = '' if errs == None else errs.decode('utf-8')

    rc = proc.returncode

    return (rc, (outs, errs))
Łukasz Zdun avatar Jun 19 '13 18:06 Łukasz Zdun
Aceptado

Para capturar y mostrar al mismo tiempo stdout y stderr de un proceso hijo línea por línea en un solo subproceso, puede usar E/S asíncrona:

#!/usr/bin/env python3
import asyncio
import os
import sys
from asyncio.subprocess import PIPE

@asyncio.coroutine
def read_stream_and_display(stream, display):
    """Read from stream line by line until EOF, display, and capture the lines.

    """
    output = []
    while True:
        line = yield from stream.readline()
        if not line:
            break
        output.append(line)
        display(line) # assume it doesn't block
    return b''.join(output)

@asyncio.coroutine
def read_and_display(*cmd):
    """Capture cmd's stdout, stderr while displaying them as they arrive
    (line by line).

    """
    # start process
    process = yield from asyncio.create_subprocess_exec(*cmd,
            stdout=PIPE, stderr=PIPE)

    # read child's stdout/stderr concurrently (capture and display)
    try:
        stdout, stderr = yield from asyncio.gather(
            read_stream_and_display(process.stdout, sys.stdout.buffer.write),
            read_stream_and_display(process.stderr, sys.stderr.buffer.write))
    except Exception:
        process.kill()
        raise
    finally:
        # wait for the process to exit
        rc = yield from process.wait()
    return rc, stdout, stderr

# run the event loop
if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
rc, *output = loop.run_until_complete(read_and_display(*cmd))
loop.close()
jfs avatar Sep 21 '2014 15:09 jfs

Puede generar subprocesos para leer las canalizaciones stdout y stderr, escribir en una cola común y agregar a listas. Luego use un tercer hilo para imprimir elementos de la cola.

import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE


def read_output(pipe, funcs):
    for line in iter(pipe.readline, ''):
        for func in funcs:
            func(line)
            # time.sleep(1)
    pipe.close()

def write_output(get):
    for line in iter(get, None):
        sys.stdout.write(line)

process = subprocess.Popen(
    ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True, bufsize=1)
q = Queue.Queue()
out, err = [], []
tout = threading.Thread(
    target=read_output, args=(process.stdout, [q.put, out.append]))
terr = threading.Thread(
    target=read_output, args=(process.stderr, [q.put, err.append]))
twrite = threading.Thread(target=write_output, args=(q.get,))
for t in (tout, terr, twrite):
    t.daemon = True
    t.start()
process.wait()
for t in (tout, terr):
    t.join()
q.put(None)
print(out)
print(err)

La razón para usar el tercer hilo, en lugar de permitir que los dos primeros hilos se impriman directamente en la terminal, es evitar que ambas declaraciones de impresión ocurran simultáneamente, lo que a veces puede resultar en texto confuso.


Lo anterior llama a random_print.py, que imprime en stdout y stderr al azar:

import sys
import time
import random

for i in range(50):
    f = random.choice([sys.stdout,sys.stderr])
    f.write(str(i)+'\n')
    f.flush()
    time.sleep(0.1)

Esta solución toma prestado código e ideas de JF Sebastian, aquí .


Aquí hay una solución alternativa para sistemas tipo Unix, usando select.select:

import collections
import select
import fcntl
import os
import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE

def make_async(fd):
    # https://stackoverflow.com/a/7730201/190597
    '''add the O_NONBLOCK flag to a file descriptor'''
    fcntl.fcntl(
        fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)

def read_async(fd):
    # https://stackoverflow.com/a/7730201/190597
    '''read some data from a file descriptor, ignoring EAGAIN errors'''
    # time.sleep(1)
    try:
        return fd.read()
    except IOError, e:
        if e.errno != errno.EAGAIN:
            raise e
        else:
            return ''

def write_output(fds, outmap):
    for fd in fds:
        line = read_async(fd)
        sys.stdout.write(line)
        outmap[fd.fileno()].append(line)

process = subprocess.Popen(
    ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True)

make_async(process.stdout)
make_async(process.stderr)
outmap = collections.defaultdict(list)
while True:
    rlist, wlist, xlist = select.select([process.stdout, process.stderr], [], [])
    write_output(rlist, outmap)
    if process.poll() is not None:
        write_output([process.stdout, process.stderr], outmap)
        break

fileno = {'stdout': process.stdout.fileno(),
          'stderr': process.stderr.fileno()}

print(outmap[fileno['stdout']])
print(outmap[fileno['stderr']])

Esta solución utiliza código e ideas de la publicación de Adam Rosenfield, aquí .

unutbu avatar Jun 19 '2013 12:06 unutbu