基于 Byzer, 算法工程师可以完全自己一个人就能快速制作和发布一个基于AI模型的应用。整个流程会包括:
然后其他用户(Non-tech)就可以使用到算法工程师的成果,从这点上,我们 Beyond Data as API的(面向 Tech User)
今天,我们举一个图片分类的例子来展示如何基于 Byzer 快速构建深度学习模型,并且发布成应用。
你需要搭建三个系统(都是开源的):
Byzer Data as Form 项目地址: https://gitee.com/allwefantasy/byzer-data-as-form
我们分成三个部分:
分别对应如下三个 Notebook:
我们先来看 image_train
,也就是我们的训练过程。
首先,我们从数据湖里加载 mnist 数据集:
-- 加载图片数据
load image.`/trainingSet/trainingSet/**/*.jpg` as mnist;
-- 对图片做分区
run mnist as TableRepartition.`` where partitionNum="4" as newMnist;
-- 后保存到数据湖里去
save overwrite newMnist as delta.`data.raw_mnist` where mergeSchema="true";
执行结果:
接着,我们从数据湖加载,并且从路径得到图片的分类:
-- 从数据湖加载数据
load delta.`data.raw_mnist` as mnist;
-- 从路径中获得分类标签
select split(image.origin,"/")[7] as label, image.data as content from mnist as newMnist;
这样我们就有一个叫 newMnist
的数据集,包含了content, label 两个字段。
执行结果如下:
接着,我们采用 Tensorflow 深度学习模型,并且使用 parameter-server 模式的方式来实现一个分布式的模型训练。代码采用 Byzer-python, 通过这个 API 我们可以很方便的拿到前面 newMnist
数据集,然后生成一个模型。
代码如下:
#%python
#%input=newMnist
#%output=newMnist_model
#%cache=true
#%schema=file
#%dataMode=model
#%env=source /home/williamzhu/miniconda3/bin/activate ray-1.12.0
from functools import reduce
import os
import ray
import numpy as np
from tensorflow.keras import models,layers
from tensorflow.keras import utils as np_utils
from pyjava.api.mlsql import RayContext
from pyjava.storage import streaming_tar
## 连接 ray 集群
ray_context = RayContext.connect(globals(),"127.0.0.1:10001")
## 获取所有数据分片地址
data_servers = ray_context.data_servers()
replica_num = len(data_servers)
## 获取每个数据分片的数据
def data_partition_creater(data_server):
temp_data = [item for item in RayContext.collect_from([data_server])]
train_images = np.array([np.array(list(item["content"])) for item in temp_data])
train_labels = np_utils.to_categorical(np.array([item["label"] for item in temp_data]))
train_images = train_images.reshape((len(temp_data),28*28))
return train_images,train_labels
## 创建模型
def create_tf_model():
network = models.Sequential()
network.add(layers.Dense(512,activation="relu",input_shape=(28*28,)))
network.add(layers.Dense(10,activation="softmax"))
network.compile(optimizer="sgd",loss="categorical_crossentropy",metrics=["accuracy"])
return network
## 构建训练类
@ray.remote
class Network(object):
def __init__(self,data_server):
self.model = create_tf_model()
# you can also save the data to local disk if the data is
# not fit in memory
self.train_images,self.train_labels = data_partition_creater(data_server)
def train(self):
history = self.model.fit(self.train_images,self.train_labels,batch_size=128)
return history.history
def get_weights(self):
return self.model.get_weights()
def set_weights(self, weights):
# Note that for simplicity this does not handle the optimizer state.
self.model.set_weights(weights)
def get_final_model(self):
model_path = os.path.join("/","tmp","minist_model")
self.model.save(model_path)
model_binary = [item for item in streaming_tar.build_rows_from_file(model_path)]
return model_binary
def shutdown(self):
ray.actor.exit_actor()
## 实例化训练类,有多少个数据分片,就会实例化多少个训练类,并且他们都是在远程引用
workers = [Network.remote(data_server) for data_server in data_servers]
ray.get([worker.train.remote() for worker in workers])
## 获取初始训练的权重
_weights = ray.get([worker.get_weights.remote() for worker in workers])
## 对不同实例计算得到的权重做加权平均,然后返回给每个实例,再进行计算
def epoch_train(weights):
sum_weights = reduce(lambda a,b: [(a1 + b1) for a1,b1 in zip(a,b)],weights)
averaged_weights = [layer/replica_num for layer in sum_weights]
ray.get([worker.set_weights.remote(averaged_weights) for worker in workers])
ray.get([worker.train.remote() for worker in workers])
return ray.get([worker.get_weights.remote() for worker in workers])
for epoch in range(30):
_weights = epoch_train(_weights)
## 随机取一个模型的权重,然后输出
model_binary = ray.get(workers[0].get_final_model.remote())
[worker.shutdown.remote() for worker in workers]
ray_context.build_result(model_binary)
具体代码意思可以参考代码中的注释。这里需要注意的有:
newMnist_model
执行后的结果如下:
系统以文件形式展示。我们希望把这个模型保存到数据湖里:
save overwrite newMnist_model as delta.`ai_model.mnist_model`;
结果如下:
第一步是从数据湖里加载已经存储的模型
load delta.`ai_model.mnist_model` as mnist_model;
第二步则是把模型注册成 UDF函数,这里用了 Byzer 一个特殊语法 register
:
!python conf "rayAddress=127.0.0.1:10001";
!python conf "schema=file";
!python env "PYTHON_ENV=source /home/williamzhu/miniconda3/bin/activate ray-1.12.0";
!python conf "dataMode=model";
!python conf "runIn=driver";
register Ray.`mnist_model` as mnist_model_predict where
maxConcurrency="2"
and debugMode="true"
and registerCode='''
import ray
import numpy as np
from pyjava.api.mlsql import RayContext
from pyjava.udf import UDFMaster,UDFWorker,UDFBuilder,UDFBuildInFunc
ray_context = RayContext.connect(globals(), context.conf["rayAddress"])
## 这个是预测函数,函数会给你模型以及待预测向量
def predict_func(model,v):
train_images = np.array([v])
train_images = train_images.reshape((1,28*28))
predictions = model.predict(train_images)
return {"value":[[float(np.argmax(item)) for item in predictions]]}
UDFBuilder.build(ray_context,UDFBuildInFunc.init_tf,predict_func)
''' and
predictCode='''
import ray
from pyjava.api.mlsql import RayContext
from pyjava.udf import UDFMaster,UDFWorker,UDFBuilder,UDFBuildInFunc
ray_context = RayContext.connect(globals(), context.conf["rayAddress"])
UDFBuilder.apply(ray_context)
'''
;
核心其实只有 predict_func
, 其他都是样板代码。maxConcurrency 设置并行度,也就是这个 UDF 函数可以同时被多少个线程调用。注册的函数叫:mnist_model_predict
,注册完成后就可以在 SQL 中使用。
执行结果:
现在我们验证下预测函数是不是可以正确执行:
select mnist_model_predict(array(byteArrayToIntArray(content)))[0][0] as predicted_label,label
from newMnist limit 10
as output;
执行结果:
貌似不太准确,不过我们这里主要是演示流程。
到这里,我们完成了模型部署。
模型部署完成后,我们还需要提供一个界面,供 Non-tech用户使用。我们会在 Byzer Notebook 中进行Byzer 代码来制作界面,然后通过 Byzer Data as Form 进行界面渲染。
代码如下:
-- 控制这个 Form App 的访问性。在这里,只允许william访问
set FORM_VISIBILITY = "user:william";
-- 设置 Form App 的标题
set title = "图片预测" where type="defaultParam" and formType="title" ;
-- 设置 Form App 的表单元素,这里我们需要一个让用户上传图片的框
set pic="" where
type="defaultParam"
and formType="upload"
and label="上传图片";
保存下这个 Notebook,得到他的 ID(红框部分):
这样,我们界面就设计完了。这个 ID 号是 38, 我们到 Byzer Data as Form里注册下这个 ID:
点击 Byzer Data as Form的 Publish Form App:
然后注册下:
点击 Commit, 就能在首页看到自己发布的 App了:
点击进去,看看界面:
符合我们要求。不过此时还没有预测逻辑。我们继续完善我们 Form App 的代码。完整代码如下:
set FORM_VISIBILITY = "user:william";
set title = "图片预测" where type="defaultParam" and formType="title" ;
set pic="" where
type="defaultParam"
and formType="upload"
and label="上传图片";
select decodeImage(unbase64("${pic}")) as content, "wow1.png" as fileName as imageTable;
select mnist_model_predict(array(byteArrayToIntArray(content)))[0][0] as predicted_label
from imageTable
as output;
在上面的代码中,pic 参数会得到用户上传的图片,这个图片使用 base64 编码,我们需要用 SQL 对他进行解码,同时使用 decodeImage 对 jpeg 进行解码,然后使用前面注册的函数 mnist_model_predict
进行预测。
这样,我们就制作好了。
选择一张图片,然后上传:
点击 Commit:
得到预测结果。
Byzer 可以制作很复杂的 Form App,比如下面都是使用 Byzer 制作的:
通过结合 Byzer-notebook 和 Byzer Data as Form, 算法工程师可以轻松将自己的成果发布成 Form App提供给 Non-tech使用。