源码网商城,靠谱的源码在线交易网站 我的订单 购物车 帮助

源码网商城

ruby和pig处理流式文件实例

  • 时间:2022-11-19 06:32 编辑: 来源: 阅读:
  • 扫一扫,手机访问
摘要:ruby和pig处理流式文件实例
大数据操作中涉及到数据清洗步奏还是用脚本处理比较方便,下边介绍一下pig加载hdfs文件后调用ruby脚本处理数据,再返回数据流至pig中处理的一个简单案例。 注意:ruby的流式处理用到wukong这个gem包,相关下载: https://github.com/mrflip/wukong pig中加载分布式文件调用ruby流式处理:
[u]复制代码[/u] 代码如下:
log = load '$INFILE' using PigStorage('\t'); define tracking_parser `/usr/ruby parse_click.rb --map` SHIP('parse_click.rb', 'click_tracking.rb'); strmo = stream log through tra_parser; store strmo into '$OUTFILE' using PigStorage('\t');
[u]复制代码[/u] 代码如下:
require 'wukong' require 'json' require './click_tra.rb' module ParseClick   class Mapper < Wukong::Streamer::RecordStreamer     def before_stream       @bad_count = 0     end     def after_stream       raise RuntimeError, "Exceeded bad records : #{@bad_count}" if @bad_count > 10     end     def process *records       yield ClickTra.new(JSON.parse(records[2])).to_a     rescue => e       @bad_count += 1       warn "Bad record #{e}: #{records[2]}"     end   end end Wukong.run ParseClick::Mapper, nil
[u]复制代码[/u] 代码如下:
require 'date' require './models.rb' class ClickTra     output :ip   output :c_date   #output your other atrributes   def c_date     click_date.strftime("%Y%m%d").to_i   end    def ip     browser_ip.to_i   end end
其中 strmo = stream log through tra_parser;调用定义的外部程序tra_parser处理log对象。 Wukong.run ParseClick::Mapper, nil执行完后,将ruby执行结果回调pig接收。 store strmo into '$OUTFILE' using PigStorage('\t');做结果存储持久化。
  • 全部评论(0)
联系客服
客服电话:
400-000-3129
微信版

扫一扫进微信版
返回顶部