In [1]:
import easyquotation
import pandas as pd
import numpy as np
import time
from confluent_kafka import Producer,Consumer
quanta_list = []
quotation_engine = easyquotation.use('sina')
def foo():
while True:
q1 = quotation_engine.all
df = pd.DataFrame(q1).T
p.produce('test-quant',df.to_msgpack())
time.sleep(10)
In [2]:
q1 = quotation_engine.all
df = pd.DataFrame(q1).T
In [3]:
from streamz.dataframe import DataFrame
from streamz import Stream
conf = {'bootstrap.servers': 'localhost:9092',
'message.max.bytes': 524288000,
'group.id': 'hi', 'session.timeout.ms': 6000,
# 'on_commit': lambda x:x,
# 'on_assign':lambda x:x,
'default.topic.config': {'auto.offset.reset': 'smallest'},#smallest,earliest
}
def isdf(x):
return type(x)==pd.core.frame.DataFrame
source = Stream.from_kafka(['test-quant'],consumer_params=conf,start=True)
quant = source.map(pd.read_msgpack).filter(isdf)
quant.map(lambda df:df.head()).map(lambda df:df[['now','open']]).sink(display)
var element = $('#61901593-c697-4e0e-ad17-c8f2c3fae6ae'); {"model_id": "8629bab4ae2a42fe908a3fe8b82354c0", "version_major": 2, "version_minor": 0}
In [4]:
def to_my_df(df):
df['code']=df.index
df = df.reset_index()
df = df.query('now!=0').query('buy!=0').query('ask1_volume>100')
df['p_change']=(df.buy-df.now)/df.now
return df
def getmycode(df):
return df[df.code.str.startswith('150')]
def getmydf(df):
df = df.query('ask1_volume>100')
df.now = df.now.astype(float)
#随便写的,没有意义
df['jiange'] = df.now-df.now.shift(1)
return df[['code','name','jiange']]
def mygroup(df):
return df.pivot_table(index=['code','name'],values=['jiange'],aggfunc=np.average).sort_values('jiange').tail(10)
quant.map(to_my_df).map(getmydf).map(getmycode).sliding_window(1).map(pd.concat).map(mygroup).sink(display)
var element = $('#505e5b67-4fc6-4bed-a0d8-b1c3d9addda1'); {"model_id": "90191a8811c34609a599fa1b8d6af22d", "version_major": 2, "version_minor": 0}
In [5]:
p = Producer( {'bootstrap.servers': 'localhost:9092','message.max.bytes': 5242880})
p.produce('test-quant',df.to_msgpack())
In [6]:
source.visualize()
.dataframe tbody tr th:only-of-type { vertical-align: middle; } .dataframe tbody tr th { vertical-align: top; } .dataframe thead th { text-align: right; }
now | open | |
---|---|---|
000001 | 10.47 | 10.49 |
000002 | 25.36 | 24.81 |
000003 | 0 | 0 |
000004 | 16.78 | 16.7 |
000005 | 3.01 | 3 |
.dataframe tbody tr th:only-of-type { vertical-align: middle; } .dataframe tbody tr th { vertical-align: top; } .dataframe thead th { text-align: right; }
jiange | ||
---|---|---|
code | name | |
150130 | "医药A | 0.508 |
150344 | "证券B基 | 0.533 |
150172 | "证券B | 0.549 |
150265 | "一带A | 0.552 |
150241 | "银行A级 | 0.553 |
150209 | "国企改A | 0.566 |
150198 | "食品A | 0.572 |
150051 | "沪深300A | 0.583 |
150221 | "中航军A | 0.585 |
150028 | "中证500A | 0.821 |
Out[6]:
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有