我正在一个脚本上运行一个自动化程序,我希望在shell实例被写入文件之前对它的输出进行评估。
bashCommand = "ls -a"
middleman = Middleman("Generic text", function_option, function)
process = subprocess.Popen(bashCommand.split(), stdout=middleman)
output, error = process.communicate()middleman类的目的是计算bashCommand的每一行输出,然后,如果它在行中找到给定的文本,则使用给定的设置执行函数。
这里是中间商阶层:
import io
class Middleman(io.TextIOBase):
def __init__(self, target_text, target_settings, target_function):
self.target_text = target_text
self.target_function = target_function
self.target_settings = target_settings
def write(self, text, *args, **kwargs):
if self.target_text in text:
self.target_function(self.target_settings)
print("Middleman:", "Function applied!")
print("Middleman:", text)
else:
print("Middleman:", text)
def fileno(self, *args, **kwargs):
return 1问题是,子进程标准输出不使用.write()作为向TextIOWrapper写入的方法,有人知道我如何才能达到我想要的效果吗?
谢谢。
发布于 2022-04-22 00:04:07
与其让中间商成为TextIOBase的子代并将其作为标准输出传递,您还可以使用stdout=PIPE并让它调用process.stdout.read()。
class Middleman:
def __init__(self, target_text, target_settings, target_function):
self.target_text = target_text
self.target_function = target_function
self.target_settings = target_settings
def read(self, text_stream):
for text in text_stream:
if self.target_text in text:
self.target_function(self.target_settings)
print("Middleman:", "Function applied!")
print("Middleman:", text)
else:
print("Middleman:", text)
bashCommand = "ls -a"
middleman = Middleman("Generic text", function_option, function)
process = subprocess.Popen(bashCommand.split(), stdout=subprocess.PIPE)
middleman.read(process.stdout)
output, error = process.communicate()您还必须处理累积的输出。因为我们在中间商中消耗了所有的输出,所以output可能是空的。
https://stackoverflow.com/questions/68113171
复制相似问题