设计概要:
使用了tornado的异步和streamz的流处理两个库,需要redis 5.0以上版本
In [20]:
import os
import weakref
import time
import tornado.ioloop
from tornado import gen
from streamz.core import Stream, convert_interval
import pandas as pd
class Source(Stream):
_graphviz_shape = 'doubleoctagon'
异步定时循环任务,不听生成数据,并将数据push到redis
In [21]:
@Stream.register_api(staticmethod)
class engine(Source):
def __init__(self, topic,
push_interval=1,
start=False,
func=lambda:{'time':time.time()},
asyncflag=False,
threadcount=5,
**kwargs):
self.producer = None
self.topic = topic
self.push_interval = push_interval
self.func=func
self.asyncflag = asyncflag
if self.asyncflag:
from concurrent.futures import ThreadPoolExecutor
self.thread_pool = ThreadPoolExecutor(threadcount)
super(engine, self).__init__(ensure_io_loop=True, **kwargs)
self.stopped = True
if start:
self.start()
def do_push(self):
if self.producer is not None:
msg = self.producer.add(self.func())
if msg :
return msg
@gen.coroutine
def push_redis(self):
while True:
if self.asyncflag:
val = self.thread_pool.submit(self.do_push)
else:
val = self.do_push()
yield gen.sleep(self.push_interval)
if self.stopped:
break
def start(self):
from walrus import Database
import distributed
if self.stopped:
self.finalize = distributed.compatibility.finalize
self.db = Database()
self.producer = self.db.Stream(self.topic)
self.stopped = False
self.loop.add_callback(self.push_redis)
def stop(self):
if self.producer is not None:
self.producer = None
self.stopped = True
self.finalize(self, self.stop, weakref.ref(self))
从redis读取流数据生成stream对象
In [22]:
@Stream.register_api(staticmethod)
class from_redis(Source):
def __init__(self, topics, poll_interval=0.1, start=False,group='test',
**kwargs):
self.consumer = None
self.topics = topics
self.group=group
self.poll_interval = poll_interval
super(from_redis, self).__init__(ensure_io_loop=True, **kwargs)
self.stopped = True
if start:
self.start()
def do_poll(self):
if self.consumer is not None:
msg = self.consumer.read()
if msg:
return msg
@gen.coroutine
def poll_redis(self):
while True:
val = self.do_poll()
if val:
yield self._emit(val)
else:
yield gen.sleep(self.poll_interval)
if self.stopped:
break
def start(self):
import confluent_kafka as ck
from walrus import Database
import distributed
if self.stopped:
self.finalize = distributed.compatibility.finalize
self.db = Database()
self.consumer = self.db.consumer_group(self.group, self.topics)
self.consumer.create() # Create the consumer group.
# self.consumer.set_id('$')#不会从头读
self.stopped = False
self.loop.add_callback(self.poll_redis)
def stop(self):
if self.consumer is not None:
self.consumer.destroy()
self.consumer = None
self.stopped = True
self.finalize(self, self.stop, weakref.ref(self))
In [50]:
def parse(meta_msg):
topic,msg = meta_msg[0],meta_msg[1][0]
msg_id,msg_body = msg
return msg_body
def ismykey(byte_dict):
return byte_dict.get(b'quant_df') != None
def isotherkey(byte_dict):
return byte_dict.get(b'quant_df') == None
def get_quant(byte_dict):
return byte_dict.get(b'quant_df')
def isdf(msg):
return type(msg)==pd.core.frame.DataFrame
def to_my_df(df):
df['code']=df.index
df = df.reset_index()
df = df.query('now!=0').query('buy!=0').query('ask1_volume>100')
df['p_change']=(df.buy-df.now)/df.now
return df
def getmydf(df):
df = df.query('ask1_volume>100')
df.now = df.now.astype(float)
#随便写的,没有意义
df['jiange'] = df.now-df.now.shift(1)
return df[['code','name','jiange']]
def get_index(df):
return df[df.code.str.startswith('150')]
def my_sort(df):
return df.sort_values('jiange').tail(10)
In [51]:
# source.stop()
source = Stream.from_redis(['stream-a'],group='new16', start=False)
mysource = source.flatten().map(parse)
from fn import F
otherkey = mysource.filter(isotherkey)
l = otherkey.sink_to_list()
squant = mysource.filter(ismykey).map(get_quant).map(pd.read_msgpack).filter(isdf)
sdf = squant.map(to_my_df).map(getmydf).map(get_index).map(lambda df:df.sample(15)).map(my_sort)
sdf
var element = $('#dcaf87a9-d898-43d6-a469-529c03163feb'); {"model_id": "9a9dc555feec40d78d394d2e13cd0429", "version_major": 2, "version_minor": 0}
In [52]:
source.start()
In [49]:
source.stop()
In [7]:
source.visualize()
Out[7]:
生成数据到原函数
In [26]:
def gen_quant():
import easyquotation
quotation_engine = easyquotation.use('sina')
q1 = quotation_engine.all
df = pd.DataFrame(q1).T
return {'quant_df':df.to_msgpack()}
def gen_test():
import moment as mm
return {'test':mm.now().seconds}
def gen_block_test():
import moment as mm
import time
time.sleep(6)
return {'block_test':mm.now().seconds}
In [9]:
engine2 = engine(topic='stream-a',func=gen_test,push_interval=1)
engine2.start()
In [18]:
engine2.stop()
In [27]:
engine1 = Stream.engine(topic='stream-a',func=gen_quant,push_interval=5,asyncflag=True)
engine1.start()
In [53]:
engine1.stop()
In [ ]:
engine3 = Stream.engine(topic='stream-a',func=gen_block_test,push_interval=1,asyncflag=True,threadcount=10)
engine3.start()
In [ ]:
engine3.stop()
In [ ]:
engine3.stop()