线上运行的生产系统会定时采集一项丢包数据,这项数据与某个进程相关联,从进程启动开始就一直递增,每隔1分钟采集一次数据,当进程重启之后,这项数据会清零。现在要求使用PIG来统计某个时间段(1 hour)内,多个进程此项数据的变化量汇总。可以看到数据形如以下形式。进程会通过GrpID分组,每个组内有多个进程,需要计算的是各组VALUE值的总的变化量。总数据量约为12w。
PID GrpID TIMESTAMP VALUE
a.1 a 2017-08-04 20:01:00 5
a.1 a 2017-08-04 20:02:00 7
a.2 a 2017-08-04 20:00:00 13
a.1 a 2017-08-04 20:03:00 0
a.1 a 2017-08-04 20:04:00 10
b.3 b 2017-08-04 20:10:00 12
b.3 b 2017-08-04 20:11:00 27
b.3 b 2017-08-04 20:12:00 33
c.2 c 2017-08-04 20:01:00 27
c.3 c 2017-08-04 20:02:00 30
......
粗看起来这个问题似乎很简单,因为数据量并不是很大,可以首先LOAD整个数据集,然后按照PID分组,在分组内对TIMESTAMP时间排序,计算最后一个与第一个VALUE的差值,然后再对GrpID分组将刚才计算出来的差值求和即可。仔细想想这是不行的,因为在每个PID分组内,本次时间片内的数据有可能因为进程重启而清零(如下图),所以不能简单的按照时间排序后尾首相减来计算。
这种累积型数据的计算方式应该如下图,计算多个分段分别的diff值,最后汇总。
具体的算法也非常简单:
SUM_Diff = SUM((V_t – V_(t-1)) >= 0 ? (V_t – V_(t-1)) : 0)
从最后一个VALUE开始,计算Vt – V(t-1) 的值并求和,当遇到差值为负的情况,也就是出现了进程重启清零的情况,就加零。
上述算法很简单,用脚本可以很快搞定。但如果需要用PIG任务来写,第3个步骤就没有这么容易实现了。不过好在PIG脚本可以调用其他语言编写的UDF(User Define Function)来完成某些复杂的计算逻辑,我们就采用此种方案。如何使用Jython实现PIG UDF请参考官方文档
https://pig.apache.org/docs/r0.9.1/udf.html
先来看PIG脚本代码:
REGISTER 'pycalc/calc_lost_pkg.py' using jython as myudf;
REGISTER /data/gdata/pig-0.15.0/thirdparty/mysql-connector-java-5.1.38-bin.jar;
REGISTER /data/gdata/pig-0.15.0/thirdparty/piggybank-0.15.0.jar;
REGISTER /data/gdata/pig-0.16.0/thirdparty/KVLoader-0.5.1.jar
A = LOAD 'data.log' USING com.tencent.gdata.pig.KVLoader('&', '=', 'PID:GrpID:TIMESTAMP:VALUE');
B = FOREACH A GENERATE $0 AS pid, $1 AS grpid, $2 as ts, (int)$3 as value;
C = FILTER B BY pid is not null AND grpid is not null AND ts is not null AND value is not null;
D = FOREACH C GENERATE
pid as pid,
grpid as grpid,
ts as ts,
value as value;
uniq_D = DISTINCT D;
E = GROUP uniq_D BY (pid, grpid);
F = FOREACH E {
sorted = ORDER uniq_D by ts DESC;
diff_sum = myudf.calc_lost_pkg_cnt(sorted);
GENERATE FLATTEN(group) AS (pid, grpid),
diff_sum AS diff_sum;
};
G = FOREACH (GROUP F BY grpid) GENERATE
${MACRO_LOG_DAY}${MACRO_LOG_HOUR} as logtime,
group as grpid,
SUM(F.diff_sum) as lost_pkg_cnt;
H = FILTER G BY lost_pkg_cnt is not null;
STORE H INTO '/pigtest/test.result.7' USING org.apache.pig.piggybank.storage.DBStorage('com.mysql.jdbc.Driver',
'jdbc:mysql://${MACRO_DBHOST}:${MACRO_DBPORT}/${MACRO_DATABASE}',
'${MACRO_USERNAME}', '${MACRO_PASSWORD}',
'REPLACE INTO ${MACRO_TABLENAME} (logtime, grpid, lost_pkg_cnt) VALUES (?, ?, ?)');
我们选用Jython来实现UDF,主要是实现第3步的逻辑,Python代码如下:
@outputSchema("sum:long")
def calc_lost_pkg_cnt(sorted_data):
sum = 0
for idx in xrange(len(sorted_data)):
if idx < len(sorted_data) - 1:
delta = sorted_data[idx][3] - sorted_data[idx+1][3]
if delta >= 0:
sum += delta
return sum
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。