业务需要在原有RGW的服务基础上加上对多媒体类资源的处理,比如图片resize、视频转码等。参考过多个厂家的设计,发现对方的多媒体类处理都是在URL里面加上query string来实现,比如: 裁剪正中部分,等比缩小生成200x200缩略图,对应的URL如下: http://odum9helk.qnssl.com/resource/gogopher.jpg?imageView2/1/w/200/h/200
图片操作的配置参数都是通过增加Query String=imageView2/1/w/200/h/200
问题:
思路:
前端逻辑
后端逻辑
3. 获取任务结果
Demo实现以下功能: 同步请求:借助已有的http_image_filter_module模块实现已经存储在RGW的图片(Acl=Public-read)的在线resize操作。 异步请求:过滤Put Object成功的请求,并将其中的用户自定义metadata及x-amz-request-id发送到后端kafka。(kafka之后的Job Server相关需要自己实现,这里省略)
使用的是openresty/1.15.8.2,需要编译的时候需要加上"--with-http_image_filter_module"来开启http_image_filter_module模块。
/usr/local/openresty/nginx/conf/nginx.conf 配置如下
server {
listen 80;
server_name test.s3.c.local *.s3.jrss.c.local;
client_max_body_size 20m;
location ~ "^(/.*/.*\.(jpg|png|jpeg))!c(\d+)-(\d+)$" {
set $w $3;
set $h $4;
rewrite ^(/.*/.*\.(jpg|png|jpeg))!c(\d+)-(\d+)$ $1 break;
image_filter crop $w $h; #裁剪图片
image_filter_buffer 20M;
proxy_pass http://172.25.60.215:9000;
}
location ~ "^(/.*/.*\.(jpg|png|jpeg))!r(\d+)-(\d+)$" {
set $w $3;
set $h $4;
rewrite ^(/.*/.*\.(jpg|png|jpeg))!r(\d+)-(\d+)$ $1 break;
image_filter resize $w $h; #缩略图
image_filter_buffer 20M;
proxy_pass http://172.25.60.215:9000;
}
location ~ "^(/.*/.*\.(jpg|png|jpeg))!d(\d+)$" {
set $d $3;
rewrite ^(/.*/.*\.(jpg|png|jpeg))!d(\d+)$ $1 break;
image_filter rotate $d; #翻转图片
image_filter_buffer 20M;
proxy_pass http://172.25.60.215:9000;
}
location / {
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header HTTP_PROXY "";
proxy_set_header Proxy "";
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $remote_addr;
proxy_pass http://172.25.60.215:9000;
log_by_lua_file /usr/local/openresty/nginx/conf/kafka.lua;
}
其中/usr/local/openresty/nginx/conf/kafka.lua用来将用户自定义的metadata及任务ID转发到kafka。
local cjson = require "cjson"
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "localhost", port = 9092 },
}
function send_job_to_kafka()
local req_header = ""
local udf_meta_reg = "^x-amz-meta-"
local headers_ = ngx.req.get_headers()
for k, v in pairs(headers_) do
local meta_check, meta_err = ngx.re.match(k,udf_meta_reg)
if meta_check then
req_header = req_header .. k.."="..v.." "
end
end
local log_json = {}
log_json["uri"]=ngx.var.uri
log_json["host"]=ngx.var.host
log_json["remote_addr"] = ngx.var.remote_addr
log_json["status"] = ngx.var.status
log_json["request_method"] = ngx.var.request_method
log_json["req_header"] = req_header
log_json["http_x_amz_request_id"] = ngx.var.upstream_http_x_amz_request_id
local message = cjson.encode(log_json);
return message
end
local is_args = ngx.var.is_args
local request_method = ngx.var.request_method
local status_code = ngx.var.status
-- 过滤Put Object成功的请求,记录相应的metadata及请求ID,并转发到kafka
if request_method == "PUT" and status_code == "200" and is_args == "" then
local bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = bp:send("test", "key", send_job_to_kafka())
if not ok then
ngx.log(ngx.ERR, "kafka send err:", err)
return
end
ngx.log(ngx.ERR, "kafka send sucessful:", ok)
end
import boto3
from botocore.client import Config
aws_access_key_id = '' #AK
aws_secret_access_key = '' #SK
endpoint = 'http://test.s3.c.local'
bucket = 'demo' #bucket名称
objcet = '1.jpg' #对象名称
file_path = '/Desktop/op.jpg' #本地图片文件路径
# aws4
s3 = boto3.client('s3', region_name='cn-hb-pri1',
use_ssl=False,
endpoint_url=endpoint,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
config=Config(signature_version='s3v4',
s3={'addressing_style': 'path'}))
with open(file_path, 'r') as f:
response = s3.put_object(
ACL='public-read',
Body=f,
Bucket=bucket,
Key=objcet,
ContentType='image/jpeg',
Metadata={
'save-path': 'tmp/new-img.jpg',
'width': '1920',
'heigh': '1200',
},
)
print "{}/{}/{}".format(endpoint,bucket,objcet) #默认分辨率图片地址
print "{}/{}/{}!r50-50".format(endpoint,bucket,objcet) #生成50x50分辨率缩略图
print "{}/{}/{}!r100-100".format(endpoint,bucket,objcet) #裁剪图片
print "{}/{}/{}!d90".format(endpoint,bucket,objcet) #图片旋转90度
抓包可以看到整个请求头部的内容如下:
上面脚本跑完会生成2个URL,使用对应的URL就可以对比看到对应的图片resize等效果。
通过运行下面的KafkaConsumer脚本,可以看到异步任务对应的kafka消息数据也发送成功
from kafka import KafkaConsumer
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('test',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
结果截图