gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

仿苹果音乐播放器

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 作者: east
  • ( 页面60 )
多媒体 3月 11,2021

仿苹果音乐播放器

功能有我的音乐、为你推荐、浏览、广播、搜索等功能。

通过本例子,可以学习到全局变量使用。首先对比一下页面中的共享变量是如何设置的。页面的共享变量被设置在页面Page方法的object对象上,比如data就是object对象的一个属性。所以,我们在其他方法中才能够多次使用this.data的方式引用这个data对象。页面的共享变量应该在页面中设置,所以全局共享变量自然应该在应用程序级别设置。小程序提供了一个全局方法getApp(),用于获取小程序的App对象。

var pageObject = {
  data: {
    playing:false,
    playingSongsNum:0,
    musicGroupName:items[0],
    listTemplateName:'music-play-list',
    actionSheetHidden: true,
    actionSheetItems: items,
    playBar:{
      dataUrl:'http://stream.qqmusic.tc.qq.com/137192078.mp3',
      name: '告白气球',
      singer:'周杰伦',
      coverImgUrl: 'http://y.gtimg.cn/music/photo_new/T002R90x90M000003RMaRI1iFoYd.jpg'
    },
    songsList:_songsList,
    albumList :_albumList
  },
  playButtonTap:function(){
    var that = this

  },
  actionSheetTap: function(e) {
    this.setData({
      actionSheetHidden: !this.data.actionSheetHidden
    })
  },
  actionSheetChange: function(e) {
    this.setData({
      actionSheetHidden: !this.data.actionSheetHidden
    })
  },
  onLoad: function () {
    var that = this
    wx.onBackgroundAudioStop(function () {
      that.setData({
        playing: false
      })
    })
  },
  play: function (event) {
    var that = this
    var res=that.data.songsList[event.currentTarget.dataset.num]
    getApp().globalData.playing = res
    that.setData({
          playBar:res,
          playingSongsNum:event.currentTarget.dataset.num
    })
    wx.playBackgroundAudio({
      dataUrl: res.dataUrl,
      name: res.name,
      singer:res.singer,
      coverImgUrl: res.coverImgUrl,
      complete: function (res) {
        that.setData({
          playing: true
        })
      }
    })
  },
  pause: function () {
    var that = this
    wx.pauseBackgroundAudio({
      success: function () {
        that.setData({
          playing: false
        })
      }
    })
  },
  onUnload: function () {
    clearInterval(this.updateInterval)
  },
  onShow:function(){
    var that = this
    
      wx.request({
        url: 'http://120.27.93.97/weappserver/get_music.php',
        data: {
          mid: getApp().globalData.playing.mid
        },
        header: {
            'Content-Type': 'text/html;charset=utf-8'
        },
        success: function(res) {
          console.log(res.data)
          var obj=that.data.playBar
          obj['coverImgUrl']='http:'+res.data
          that.setData({
            playBar:obj
          })
        }
      })
      that.setData({
        playing: true,
        playBar: getApp().globalData.playing
      })
  }
}

 for (var i = 0; i < items.length; ++i) {
   (function(itemName) {
     switch(itemName){
       case '播放列表':
          pageObject['bind' + itemName] = function(e) {
          console.log('click' + itemName, e)
          this.setData({
            musicGroupName:itemName,
            listTemplateName:'music-play-list',
            templateData:null,
            actionSheetHidden: !this.data.actionSheetHidden
          })
        }
       break;

       case '歌曲':
          pageObject['bind' + itemName] = function(e) {
          console.log('click' + itemName, e)
          this.setData({
            musicGroupName:itemName,
            listTemplateName:'songs-list',
            templateData:_songsList,
            actionSheetHidden: !this.data.actionSheetHidden
          })
        }
       break;

       case '专辑':
          pageObject['bind' + itemName] = function(e) {
          console.log('click' + itemName, e)
          this.setData({
            musicGroupName:itemName,
            listTemplateName:'album-list',
            templateData:_albumList,
            actionSheetHidden: !this.data.actionSheetHidden
          })
        }
       break;

       case '演唱者':
          pageObject['bind' + itemName] = function(e) {
          console.log('click' + itemName, e)
          this.setData({
            musicGroupName:itemName,
            listTemplateName:'singer-list',
            templateData:null,
            actionSheetHidden: !this.data.actionSheetHidden
          })
        }
       break;
     }

   })(items[i])
 }

下载地址:代码

作者 east
Java 3月 11,2021

使用Springboot @Value配置时遇到几个不生效的问题

在开发项目时,把一些可能变化的东西,尽量搞成配置文件。这样以后有变化时,改一下配置就可以,不用开发人员重新编译。

使用Springboot的@Value, 常规配置方法是这样:

@Compent
public class TestA{

@Value("${MY_URL}")
private String myUrl;
}

在开发当中,如果按上面方式,遇到下面情况会不生效:

1、静态变量

不能像常规那样使用,要使用set方法,例如:

@Compent
public class TestA{
private static String myUrl;

@Value("${MY_URL}")
public void setMyUrl(String url){
myUrl = url;
}
}

2、构造函数

@Compent
public class TestA{
 public TestA(@Value("${MY_URL}") String myUrl){
 }
}
作者 east
Java 3月 11,2021

Properties获取配置的数组

JDK自带的Properties类没有Springboot用Value配置参数那么方便,尤其是数组, Properties 没有相应的方法。但可以用变通的方式。就是把数组配置成字符串,用特殊符号分隔。

InputStream in = RunTest.class.getClassLoader().getResourceAsStream("application.properties");
        try {
            properties.load(in);

            String redisListString = properties.getProperty("redisList");
String[] arr = redisListString.split(",");
}catch(Exception ex){
}

application.properties的配置如下:

redisList=192.68.1.2:22409,192.68.1.3:22532

作者 east
Hbase 3月 10,2021

Hbase统计海量数据行数

业务需要统计每天hbase的数据量,而且每天增量有上百万条。

网上找了些代码,很多推荐使用Coprocessor的方式,执行效率高。但在我的大数据环境 运行出错,报“No registered coprocessor service found for name AggregateService in region xxx”。后来发现是第一次运行时需要下面这些代码来修改配置

 String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
        if (! descriptor.hasCoprocessor(coprocessorClass)) {
            descriptor.addCoprocessor(coprocessorClass);
        }
        admin.modifyTable(name, descriptor);
        admin.enableTable(name);
public void rowCount(String tablename){
    try {
        //提前创建connection和conf
        Admin admin = connection.getAdmin();
        TableName name=TableName.valueOf(tablename);
        //先disable表,添加协处理器后再enable表
        admin.disableTable(name);
        HTableDescriptor descriptor = admin.getTableDescriptor(name);
        String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
        if (! descriptor.hasCoprocessor(coprocessorClass)) {
            descriptor.addCoprocessor(coprocessorClass);
        }
        admin.modifyTable(name, descriptor);
        admin.enableTable(name);

        //计时
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        Scan scan = new Scan();
        AggregationClient aggregationClient = new AggregationClient(conf);

        System.out.println("RowCount: " + aggregationClient.rowCount(name, new LongColumnInterpreter(), scan));
        stopWatch.stop();
        System.out.println("统计耗时:" +stopWatch.getTotalTimeMillis());
    } catch (Throwable e) {
        e.printStackTrace();
    }
}
作者 east
程序员网赚 3月 7,2021

在4414发帖赚每月手机费(亲测有效)

生活不容易,水费、电费、手机费、交通费。。。虽然每一项不多,但加起来却不少。工资固定,开源节流就显得很重要。

如果你有一个网站或服务器,那恭喜你,可以在4414发发贴,每个月轻轻松松个电话费。(4414要验证网站)

如上图:每天可以回帖3次,总共赚1.5元,发帖1次赚0.5元,签到1次0.1元+。其中回帖基本像浏览新闻,基本不用动脑就可以回复评论的。

注册地址:4414网站

提现的话,需要点击右上角的“个人中心”然后在左边那个栏目中找到“我的积分”将你获得的金币安装10000:1的比例兑换成余额就可以在“财务中心”进行提现

最少提现1 元 ,请在每周五上午之前申请提现,所有提现每周五下午审核支付,每个支付宝只能绑定一个账号,重复绑定的账号封号处理 。如果有多个网站,就需要有多个支付宝账号来体现,感觉这个来钱远比集五福等任务简单。

最近梅州提现1次,已经体现了2次,赚了2杯奶茶的钱了。

作者 east
工具类 3月 6,2021

微信小程序上传多个图片源码

开发小程序,如果 除了图片外,还有其他几个文本表单,想着一起上传,微信小程序只能先把图片给传上去,然后再提交表单了。如果上传多张图片,是采用循环上传图片的方式。

const uploadFile = (filePath, clear = false, fullPath = false) => {
  return new Promise((resolve, reject) => {
    getEnv().then((res) => {
      if (!filePath || filePath.length < 9) {
        wx.showModal({
          title: '图片错误',
          content: '请重试',
          showCancel: false,
        })
        return;
      }
      const fileType = '.' + filePath.split('.')[filePath.split('.').length - 1]
      const aliyunFileKey = res.dir + new Date().getTime() + Math.floor(Math.random() * 150) + fileType;
      const aliyunServerURL = res.host;
      const accessid = res.accessId;
      wx.uploadFile({
        url: aliyunServerURL, //开发者服务器 url
        filePath: filePath, //要上传文件资源的路径
        name: 'file', //必须填file
        formData: {
          'key': aliyunFileKey,
          'policy': res.policy,
          'OSSAccessKeyId': accessid,
          'signature': res.signature,
          'success_action_status': '200',
        },
        success: function (res) {
          if (res.statusCode != 200) {
            reject();
          }
          if (clear) {
            clearEnv()
          }
          if (fullPath)
            resolve(aliyunServerURL + '/' + aliyunFileKey) // 全路径上传
          else
            resolve(aliyunFileKey) // 残路径上传
        },
        fail: function (err) {
          err.wxaddinfo = aliyunServerURL;
          failc(err);
        },
      })
    })

  })

}
// 上传图片
  upload(imageList, imageSrcList = []) {
    let that = this
    if (imageSrcList == []) {
      imageList.forEach(() => {
        imageSrcList.push([])
      })
    }
    let clear = (times == imageList.length - 1)
    uploadImage(imageList[times], clear).then((result) => {
      wx.hideLoading();
      times++
      wx.showLoading({
        title: times + '/' + imageList.length
      })
      imageSrcList[times - 1] = result
      if (times == imageList.length) {
        times = 0
        that.addPhoto(imageSrcList)
      } else {
        that.upload(imageList, imageSrcList)
      }
    })
  },
作者 east
php 3月 4,2021

解决wordpress密码正确后台登录不上

登录wordpress后台,密码正确,但是登录不上。网上找了一些方法:

修改文件“/wp-includes/pluggable.php”中的wp_set_auth_cookie函数。搜索此行代码(代码较长,分为两行):

1setcookie($auth_cookie_name,$auth_cookie,$expire,
2ADMIN_COOKIE_PATH,COOKIE_DOMAIN,$secure,true);

  将上面代码中的 ADMIN_COOKIE_PATH 改为 SITECOOKIEPATH ,当然,你会搜出两行,博主测试的结果是修改第一行就可以了,完整修改如下:

1setcookie($auth_cookie_name,$auth_cookie,$expire,
2SITECOOKIEPATH,COOKIE_DOMAIN,$secure,true);

发现还是登录不了。清缓存,换浏览器,重启php和mysql后,还是登录不上。

用df -h 看磁盘空间,发现也还没满。

用网上方法, 登录后直接访问WordPress站点地址(URL)/wp-admin/index.php ,发现也是登录不上。

然后想起最近装了一些插件,把插件改文件名称,也是无效。最后看到看日志,查了mysql的错误日志,发现下面日志:

InnoDB: mysqld and edit my.cnf so that newraw is replaced
InnoDB: with raw, and innodb_force_... is removed.
InnoDB: A new raw disk partition was initialized or
InnoDB: innodb_force_recovery is on: we do not allow
InnoDB: database modifications by the user. Shut down
InnoDB: mysqld and edit my.cnf so that newraw is replaced
InnoDB: with raw, and innodb_force_... is removed.

想到最近mysql崩溃了,在my.cnf的innodb_force_recovery的值被我强制改成1后恢复了,想回不会是这个影响,把
innodb_force_recovery 的值又改为0。重启mysql后登录wordpress,果然正常了。

总结:网上的方法并不是简单照搬就能解决问题,要分析最近操作了什么,看日志,对症下药。

作者 east
Spark 3月 3,2021

Spark Streaming调优实践

当我们将应用部署在集群上时,可能会碰到运行慢、占用过多资源、不稳定等问题,这时需要做一些优化才能达到最好的性能。有时候一个简单的优化可以起到化腐朽为神奇的作用,使得程序能够更加有效率,也更加节省资源。本章我们就来介绍一些能够提高应用性能的参数和配置。另外需要指出的是,优化本身是一个具体性很强的事情,不同的应用及落地场景会有不同的优化方式,并没有一个统一的优化标准。本章我们将一些常用的和在项目中踩过的“坑”总结一下,列举以下常见的优化方式。

数据序列化在分布式应用中,序列化(serialization)对性能的影响是显著的。如果使用一种对象序列化慢、占用字节多的序列化格式,就会严重降低计算效率。通常在Spark中,主要有如下3个方面涉及序列化:

● 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。

● 将自定义的类型作为RDD的泛型类型时,所有自定义类型对象都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。

● 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER), Spark会将RDD中的每个partition都序列化成一个大的字节数组。而Spark综合考量易用性和性能,提供了下面两种序列化库。

● Java序列化:默认情况下,Spark使用Java的对象输出流框架(ObjectOutputStream framework)来进行对象的序列化,并且可用在任意实现Java.io.Serializable接口的自定义类上。我们可以通过扩展Java.io.Externalizable来更加精细地控制序列化行为。Java序列化方式非常灵活,但是通常序列化速度非常慢而且对于很多类会产生非常巨大的序列化结果。

● Kryo序列化:Spark在2.0.0以上的版本可以使用Kryo库来非常快速地进行对象序列化,Kryo要比Java序列化更快、更紧凑(10倍),但是其不支持所有的Serializable类型,并且在使用自定义类之前必须先注册。

我们可以在初始化SparkConf时,调用conf.set(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”)来使用Kryo。一旦进行了这个配置,Kryo序列化不仅仅会用在Shuffling操作时worker节点间的数据传递,也会用在RDDs序列化到硬盘的过程。Spark官方解释没有将Kryo作为默认序列化方式的唯一原因是,Kryo必须用户自己注册(注意如果我们不注册自定义类,Kryo也是可以正常运行的,但是它必须存储每个对象的完整类名,这是非常浪费的),但是其推荐在网络频繁传输的应用中使用Kryo。另外值得注意的是,在Spark 2.0.0之后,Spark已经默认将Kryo序列化作为简单类型(基本类型、基本类型的数组及string类型)RDD进行Shuffling操作时传输数据的对象序列化方式。Spark已经自动包含注册了绝大部分Scala的核心类,如果需要向Kryo注册自己的类别,可以使用registerKryoClasses方法。使用Kryo的代码框架如下:

如果我们的对象非常大,可能需要增加Spark.kryoserializer.buffer的配置。同样在Spark Streaming中,通过优化序列化格式可以缩减数据序列化的开销,而在Streaming中还会涉及以下两类数据的序列化。

● 输入数据:在4.4.1节中曾讲过,Spark Streaming中不同于RDD默认是以非序列化的形式存于内存当中,Streaming中由接收器(Receiver)接收而来的数据,默认是以序列化重复形式(StorageLevel.MEMORY_AND_DISK_SER_2)存放于Executor的内存当中。而采用这种方式的目的,一方面是由于将输入数据序列化为字节流可以减少垃圾回收(GC)的开销,另一方面对数据的重复可以对Executor节点的失败有更好的容错性。同时需要注意的是,输入数据流一开始是保存在内存当中,当内存不足以存放流式计算依赖的输入数据时,会自动存放于硬盘当中。而在Streaming中这部分序列化是一个很大的开销,接收器必须先反序列化(deserialize)接收到的数据,然后再序列化(serialize)为Spark本身的序列化格式。

● 由Streaming操作产生RDD的持久化:由流式计算产生的RDDs有可能持久化在内存当中,例如由于基于窗口操作的数据会被反复使用,所以会持久化在内存当中。值得注意的是,不同于Spark核心默认使用非序列化的持久化方式(StorageLevel. MEMORY_ONLY),流式计算为了减少垃圾回收(GC)的开销,默认使用了序列化的持久化方式(StorageLevel.MEMORY_ONLY_SER)。

不管在Spark还是在Spark Streaming中,使用Kryo序列化方式,都可以减少CPU和内存的开销。而对于流式计算,如果数据量不是很大,并且不会造成过大的垃圾回收(GC)开销,我们可以考虑利用非序列化对象进行持久化。

例如,我们使用很小的批处理时间间隔,并且没有基于窗口的操作,可以通过显示设置相应的存储级别来关闭持久化数据时的序列化,这样可以减少序列化引起的CPU开销,但是潜在的增加了GC的开销。

2.广播大变量

我们可以看出,不论Spark还是Spark Streaming的应用,在集群节点间进行数据传输时,都会有序列化和反序列化的开销,而如果我们的应用有非常大的对象时,这部分开销是巨大的。比如应用中的任何子任务需要使用Driver节点的一个大型配置查询表,这时就可以考虑将该表通过共享变量的方式,广播到每一个子节点,从而大大减少在传输和序列化上的开销。另外,Spark在Master节点会打印每个任务的序列化对象大小,我们可以通过观察任务的大小,考虑是否需要广播某些大变量。通常一个任务的大小超过20KB,是值得去优化的。当我们将大型的配置查询表广播出去时,每个节点可以读取配置项进行任务计算,那么假设配置发生了动态改变时,如何通知各个子节点配置表更改了呢?(尤其是对于流式计算的任务,重启服务代价还是蛮大的。)

广播变量是只读的,也就是说广播出去的变量没法再修改,那么应该怎么解决这个问题呢?我们可以利用Spark中的unpersist()函数,Spark通常会按照LRU(leastRecently Used)即最近最久未使用原则对老数据进行删除,我们并不需要操作具体的数据,但如果是手动删除,可以使用unpersist()函数。

3.数据处理和接收时的并行度

作为分布式系统,增加接收和处理数据的并行度是提高整个系统性能的关键,也能够充分发挥集群机器资源。关于partition和parallelism。partition指的就是数据分片的数量,每一次Task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多Executor的计算能力无法充分利用;但是如果partition太大了则会导致分片太多,执行效率降低。在执行Action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及Shuffle,因此这个parallelism的参数没有影响)。由上述可得,partition和parallelism这两个概念密切相关,都是涉及数据分片,作用方式其实是统一的。通过Spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量,如reduceByKey和reduceByKeyAndWindow。

Spark Streaming接收Kafka数据的方式,这个过程有一个数据反序列化并存储到Spark的开销,如果数据接收成为了整个系统的瓶颈,那么可以考虑增加数据接收的并行度。每个输入DStream会创建一个单一的接收器(receiver在worker节点运行)用来接收一个单一的数据流。而对于接收多重数据的情况,可以创建多个输入DStream用来接收源数据流的不同分支(partitions)。如果我们利用Receiver的形式接收Kafka,一个单一的Kafka输入DStream接收了两个不同topic的数据流,我们为了提高并行度可以创建两个输入流,分别接收其中一个topic上的数据。这样就可以创建两个接收器来并行地接收数据,从而提高整体的吞吐量。而之后对于多个DStreams,可以通过union操作并为一个DStream,之后便可以在这个统一的输入DStream上进行操作,代码示例如下:

如果采用Direct连接方式,前面讲过Spark中的partition和Kafka中的partition是一一对应的,但一般默认设置为Kafka中partition的数量,这样来达到足够并行度以接收Kafka数据。

4.设置合理的批处理间隔

对于一个Spark Streaming应用,只有系统处理数据的速度能够赶上数据接收的速度,整个系统才能保持稳定,否则就会造成数据积压。换句话说,即每个batch的数据一旦生成就需要被尽快处理完毕。这一点我们可以通过Spark监控界面进行查看,比较批处理时间必须小于批处理间隔。通过设置合理的批处理大小(batch size),使得每批数据能够在接收后被尽快地处理完成(即数据处理的速度赶上数据生成的速度)。如何选取合适的批处理时间呢?一个好的方法是:先保守地设置一个较大的批处理间隔(如5~10s),以及一个很低的数据速率,来观测系统是否能够赶上数据传输速率。我们可以通过查看每个处理好的batch的端到端延迟来观察,也可以看全局延迟来观察(可以在Spark log4j的日志里或者使用StreamingListener接口,也可以直接在UI界面查看)。如果延迟保持在一个相对稳定的状态,则整个系统是稳定的,否则延迟不断上升,那说明整个系统是不稳定的。在实际场景中,也可以直接观察系统正在运行的Spark监控界面来判断系统的稳定性。

5. 内存优化内存

优化是在所有应用落地中必须经历的话题,虽然Spark在内存方面已经为开发者做了很多优化和默认设置,但是我们还是需要针对具体的情况进行调试。在优化内存的过程中需要从3个方面考虑这个问题:对象本身需要的内存;访问这些对象的内存开销;垃圾回收(GC garbagecollection)导致的开销。通常来说,对于Java对象而言,有很快的访问速度,但是很容易消耗原始数据2~5倍以上的内存空间,可以归结为以下几点原因:

● 每个独立的Java对象,都会有一个“对象头”,大约16个字节用来保存一些基本信息,如指向类的指针,对于一个只包含很少数据量在内的对象(如一个Int类型数据),这个开销是相对巨大的。

● Java的String对象会在原始数据的基础上额外开销40个字节,因为除了字符数组(Chars array)本身之外,还需要保存如字符串长度等额外信息,而且由于String内部存储字符时是按照UTF-16格式编码的,所以一个10字符的字符串开销很容易超过60个字符。

● 对于集合类(collection classes),如HashMap、LinkedList,通常使用链表的形式将数据结构链在一起,那么对于每一个节点(entry,如Map.Entry)都会有一个包装器(wrapper),而这个包装器对象不仅包含对象头,还会保存指向下一个节点的指针(每个8字节)。

● 熟悉Java的开发者应该知道,Java数据类型分为基本类型和包装类型,对于int、long等基本类型是直接在栈中分配空间,如果我们想将这些类型用在集合类中(如Map),需要使用对基本数据类型打包(当然这是Java的一个自动过程),而打包后的基本数据类型就会产生额外的开销。针对以上内存优化的基本问题,接下来首先介绍Spark中如何管理内存,之后介绍一些能够在具体应用中更加有效地使用内存的具体策略,例如,如何确定合适的内存级别,如何改变数据结构或将数据存储为序列化格式来节省内存等,也会从Spark的缓存及Java的垃圾回收方面进行分析,另外,也会对SparkStreaming进行分析。

5.1 内存管理

Spark对于内存的使用主要有两类用途:执行(execution)和存储(storage)。执行类内存主要被用于Shuffle类操作、join操作及排序(sort)和聚合(aggregation)类操作,而存储类内存主要用于缓存数据(caching)和集群间内部数据的传送。在Spark内部执行和存储分享同一片内存空间(M),当没有执行类内存被使用时,存储类内存可以使用全部的内存空间,反之亦然。执行类内存可以剥夺存储类内存的空间,但是有一个前提是,存储类内存所占空间不得低于某一个阈值R,也就是说R指定了M中的一块子空间块是永远不会被剥夺的。而另一方面由于实现上的复杂性,存储类内存是不可以剥夺执行类内存的。Spark的这种设计方式确保了系统一些很好的特性:首先,如果应用不需要缓存数据,那么所有的空间都可以用作执行类内存,可以一定程度上避免不必要的内存不够用时溢出到硬盘的情况;其次,如果应用需要使用缓存数据,会有最小的内存空间R能够保证这部分数据块免于被剥夺;最后,这种方式对于使用者而言是完全黑盒的,使用者不需要了解内部如何根据不同的任务负载来进行内存划分。Spark提供了两个相关的配置,但是大多数情况下直接使用默认值就能满足大部分负载情况:

● Spark Memory.Fraction表示M的大小占整个JVM(Java Virtue Machine)堆空间的比例(默认是0.6),剩余的空间(40%)被用来保存用户的数据结构及Spark内部的元数据(metadata),另一方面预防某些异常数据记录造成的OOM(Out of Memory)错误。

● Spark.Memory.StorageFraction表示R的大小占整个M的比例(默认是0.5), R是存储类内存在M中占用的空间,其中缓存的数据块不会被执行类内存剥夺。

5.2 优化策略

当我们需要初步判断内存的占用情况时,可以创建一个RDD,然后将其缓存(cache)起来,然后观察网页监控页面的存储页部分,就可以看出RDD占用了多少内存。而对于特殊的对象,我们可以调用SizeEstimator的estimate()方法来评估内存消耗,这对于实验不同数据层的内存消耗,以及判断广播变量在每个Executor堆上所占用的内存是非常有效的。当我们了解了内存的消耗情况后,发现占用内存过大,可以着手做一些优化,一方面可以在数据结构方面进行优化。首先需要注意的是,我们要避免本章开头提到的Java本身数据结构的头部开销,比如基于指针的数据结构或者包装器类型,有以下方式可以进行优化:

● 在设计数据结构时,优先使用基本数据类型及对象数组等,避免使用Java或者Scala标准库当中的集合类(如HashMap),在fastutil库中,为基本数据类型提供了方便的集合类接口,这些接口也兼容Java标准库。

● 尽可能避免在数据结构中嵌套大量的小对象和指针。

● 考虑使用数值类ID或者枚举对象来代替字符串类型作为主键(Key)。

● 如果我们的运行时内存小于32GB,可以加上JVM配置-XX:+UseCompressedOops将指针的占用空间由8个字节压缩到4个字节,我们也可以在Spark-env.sh中进行配置。

假设我们通过以上策略还是发现对象占用了过大的内存,可以用一个非常简单的方式来降低内存使用,就是将对象以序列化的形式(serialized form)存储,在RDD的持久化接口中使用序列化的存储级别,如MEMORY_ONLY_SER, Spark便会将每个RDD分区存储为一个很大的字节数组。而这种方式会使得访问数据的速度有所下降,因为每个对象访问时都需要有一个反序列化的过程。在7.1节中我们已经介绍过,优先使用Kryo序列化方式,其占用大小远低于Java本身的序列化方式。

5.3 垃圾回收(GC)优化

如果我们在应用中进行了频繁的RDD变动,那么JVM的垃圾回收会成为一个问题(也就是说,假设在程序中只创建了一个RDD,后续所有操作都围绕这个RDD,那么垃圾回收就不存在问题)。当Java需要通过删除旧对象来为新对象开辟空间时,它便会扫描我们曾创建的所有对象并找到不再使用的对象。所以垃圾回收的开销是和Java对象的个数成比例的,我们要尽可能地使用包含较少对象的数据结构(如使用Int数组代替LinkedList)来降低这部分开销。另外前面提到的用序列化形式存储也是一个很好的方法,序列化后每个对象在每个RDD分区下仅有一个对象(一个字节数组)。注意当GC开销成为瓶颈时,首先要尝试的便是序列化缓存(serialized caching)。在做GC优化时,我们首先需要了解GC发生的频率以及其所消耗的时间。这可以通过在Java选项中加入-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps来实现;之后当Spark任务运行后,便可以在Worker日志中看到GC发生时打印的信息。注意这些日志是打印在集群中的Worker节点上的(在工作目录的stdout文件中),而非Driver程序。为了进一步优化GC,首先简单介绍下Java虚拟机内部是如何进行内存管理的。

(1)Java对象是存储在堆空间内的,堆空间被分为两部分,即年轻区域(Young region)和老年区域(Old region),其中年轻代(Young generation)会用来存储短生命周期的对象,而老年代(Old generation)会用来存储较长生命周期的对象。

(2)年轻代的区域又被分为3个部分[Eden, Survivor1,Survivor2]。

(3)一个简单的GC流程大致是:当Eden区域满了,一次小型GC过程会将Eden和Survivor1中还存活的对象复制到Survivor2区域上,Survivor区域是可交换的(即来回复制),当一个对象存活周期已足够长或者Survivor2区域已经满时,那么它们会被移动到老年代上,而当老年代的区域也满了时,就会触发一次完整的GC过程。Java的这种GC机制主要是基于程序中创建的大多数对象,都会在创建后被很快销毁,只有极少数对象会存活下来,所以其分为年轻代和老年代两部分,而这两部分GC的方式也是不同的,其时间复杂度也是不同的,年轻代会更加快一些,感兴趣的读者可以进一步查阅相关资料。基于以上原因,Spark在GC方面优化的主要目标是:只有长生命周期的RDD会被存储在老年代上,而年轻代上有足够的空间来存储短生命周期的对象,从而尽可能避免任务执行时创建的临时对象触发完整GC流程。我们可以通过以下步骤来一步步优化:

(1)通过GC统计信息观察是否存在过于频繁的GC操作,如果在任务完成前,完整的GC操作被调用了多次,那么说明可执行任务并没有获得足够的内存空间。

(2)如果触发了过多的小型GC,而完整的GC操作并没有调用很多次,那么给Eden区域多分配一些内存空间会有所帮助。我们可以根据每个任务所需内存大小来预估Eden的大小,如果Eden设置大小为E,可以利用配置项-Xmn=4/3*E来对年轻代的区域大小进行设置(其中4/3的比例是考虑到survivor区域所需空间)。(3)如果我们观察GC打印的统计信息,发现老年代接近存满,那么就需要改变spark.memory.fraction来减少存储类内存(用于caching)的占用,因为与其降低任务的执行速度,不如减少对象的缓存大小。另一个可选方案是减少年轻代的大小,即通过-Xmn来进行配置,也可以通过JVM的NewRatio参数进行调整,大多数JVM的该参数的默认值是2,意思是老年代占整个堆内存的2/3,这个比例需要大于Spark.Memory.Fraction。

(4)通过加入-XX:+UserG1GC来使用G1GC垃圾回收器,这可以一定程度提高GC的性能。另外注意对于executor堆内存非常大的情况,一定通过-XX:G1HeapRegionSize来增加G1区域的大小。

针对以上步骤我们举一个例子,如果我们的任务是从HDFS当中读取数据,任务需要的内存空间可以通过从HDFS当中读取的数据块大小来进行预估,一般解压后的数据块大小会是原数据块的2~3倍,所以如果我们希望3、4个任务同时运行在工作空间中,假设每个HDFS块大小是128MB,那么需要将Eden大小设置为4×3×128MB。改动之后,我们可以监控GC的频率和时间消耗,看看有没有达到优化的效果。对于优化GC,主要还是从降低全局GC的频率出发,executor中对于GC优化的配置可以通过spark.executor.extraJavaOptions来配置。

5.4 Spark Streaming内存优化

前面介绍了Spark中的优化策略和关于GC方面的调优,对于Spark Streaming的应用程序,这些策略也都是适用的,除此之外还会有一些其他方面的优化点。对于Spark Streaming应用所需要的集群内存,很大程度上取决于要使用哪种类型的transformation操作。比如,假设我们想使用10分钟数据的窗口操作,那么我们的集群必须有足够的空间能够保存10分钟的全部数据;亦或,我们在大量的键值上使用了updateStateByKey操作,那么所需要的内存空间会较大。而如果我们仅仅使用简单的Map、Filter、Store操作,那么所需空间会较小。默认情况下,接收器接收来的数据会以StorageLevel.MEMORY_AND_DISK_SER_2的格式存储,那么如果内存不足时,数据就会序列化到硬盘上,这样会损失SparkStreaming应用的性能。所以通常建议为Spark Streaming应用分配充足的内存,可以在小规模数据集上进行测试和判断。另一方面与Spark程序有显著区别的是,Spark Streaming程序对实时性要求会较高,所以我们需要尽可能降低JVM垃圾回收所导致的延迟。基于此,我们可以通过以下几个参数对内存使用和GC开销进行优化调整。

● DStream的持久化级别:输入数据默认是持久化为字节流的,因为相较于反序列化的开销,其更会降低内存的使用并且减少GC的开销。所以优先使用Kryo序列化方式,可以大大降低序列化后的尺寸和内存开销。另外,如果需要更进一步减少内存开销,可以通过配置spark.rdd.compress进行更进一步的压缩(当然对于目前的集群机器,大多数内存都足够了)。

● 及时清理老数据:默认情况下所有的输入数据和由DStream的Transormation操作产生的持久RDD会被自动清理,即Spark Streaming会决定何时对数据进行清理。例如,假设我们使用10分钟的窗口操作,Spark Streaming会保存之前10分钟的所有数据,并及时清理过时的老数据。数据保存的时间可以通过stremingContext.remember进行设置。

● CMS垃圾回收器:不同于之前我们在Spark中的建议,由于需要减少GC间的停顿,所以这里建议使用并发标记清除类的GC方式。即使并发GC会降低全局系统的生产吞吐量,但是使用这种GC可以使得每个Batch的处理时间更加一致(不会因为某个Batch处理时发生了GC,而导致处理时间剧增)。我们需要确保在Driver节点(在spark-submit中使用—driver-java-options)和Executor节点(在Spark配置中使用spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC)都设置了CMS GC方式。

● 其他减少GC开销的方式有:可以通过OFF_HEAP存储级别的RDD持久化方式,以及可以在Executor上使用更小的堆内存,从而降低每个JVM堆垃圾回收的压力。

作者 east
Kafka, Spark 3月 3,2021

SparkStreaming Direct方式读取kafka优缺点及示例(Redis保存offset)

在Spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有Receiver这一层,其会周期性地获取Kafka中每个topic(主题)的每个partition(分区)中的最新offsets(偏移量),之后根据设定的maxRatePerPartition来处理每个batch。其形式如下图所示。

这种方法相较于Receiver方式的优势在于:

● 简化的并行。Direct方式中,Kafka中的partition与Spark内部的partition是一一对应的,这点使得我们可以很容易地通过增加Kafka中的partition来提高数据整体传输的并行度,而不像Receiver方式中还需要创建多个Receiver然后利用union再合并成统一的Dstream。

● 高效。Direct方式中,我们可以自由地根据offset来从Kafka中拉取想要的数据(前提是Kafka保留了足够长时间的数据),这对错误恢复提供了很好的灵活性。然而在Receiver的方式中,还需要将数据存入Write Ahead Log中,存在数据冗余的问题。

● 一次性接收精确的数据记录Direct方式中我们直接使用了低阶Kafka的API接口,offset默认会利用Spark Steaming的checkpoints来存储,同样也可以将其存到数据库等其他地方。然而在Receiver的方式中,由于使用了Kafka的高阶API接口,其默认是从ZooKeeper中拉取offset记录(通常Kafka取数据都是这样的),但是Spark Streaming消费数据的情况和ZooKeeper记录的情况是不同步的,当程序发生中断或者错误时,可能会造成数据重复消费的情况。

不同于Receiver的方式,是从Zookeeper中读取offset值,那么自然Zookeeper就保存了当前消费的offset值,如果重新启动开始消费就会接着上一次offset值继续消费。而在Direct的方式中,是直接从Kafka来读数据,offset需要自己记录,可以利用checkpoint、数据库或文件记录或者回写到ZooKeeper中进行记录。这里我们给出利用Kafka底层API接口,将offset及时同步到ZooKeeper的通用类中。下面示范用redis保存offset

object Demo {


  val IP_RANG: Array[String] = "91,92,93,94,95".split(",")
  val PORT_RANG: Array[String] = "22420,22421,22422,22423,22424,22425,22426,22427".split(",")
  val hosts = new util.HashSet[HostAndPort]()

  val sdf:SimpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")

  def main(args: Array[String]) {

      val Array(checkPointDir, topic, brokers, groupId, cf, offset, dw_all_tn, dw_track_tn, dw_unique_tn, batchIntervel) = args

      login

      val client: JedisCluster = new JedisCluster(hosts, 5000)


      var topicPartitions: Map[TopicPartition, Long] = Map()

      if (client.exists(topic)) {
        val offsetMap: util.Map[String, String] = client.hgetAll(topic)
        val iterator: util.Iterator[String] = offsetMap.keySet().iterator()
        while (iterator.hasNext) {
          val key: String = iterator.next()
          val value: String = offsetMap.get(key)
          println(key + "------" + value)
          topicPartitions += (new TopicPartition(topic, key.toInt) -> value.toLong)
        }
      }
      client.close()

      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> brokers,
        "value.deserializer" -> classOf[StringDeserializer],
        "key.deserializer" -> classOf[StringDeserializer],
        "group.id" -> groupId,
        "security.protocol" -> "SASL_PLAINTEXT",
        "sasl.kerberos.service.name" -> "kafka",
        "auto.offset.reset" -> offset,
        "kerberos.domain.name" -> "hadoop.hadoop.com",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )


      def functionToCreateContext(): StreamingContext = {

        //      val topicArr = topic.split(",")
        //      val topicSet = topicArr.toSet


        val locationStrategy = LocationStrategies.PreferConsistent
        //      val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)

        val sparkConf: SparkConf = new SparkConf().setAppName("jingyi_xn_dw_all&track")

        val ssc = new StreamingContext(sparkConf, Seconds(batchIntervel.toInt))
        //      if (!"nocp".equals(checkPointDir)) {
        //        ssc.checkpoint(checkPointDir)
        //      }


        val config = HBaseConfiguration.create()
        val hbaseContext = new HBaseContext(ssc.sparkContext, config)

        val stream = KafkaUtils.createDirectStream[String, String](ssc,
          locationStrategy,
          //        consumerStrategy
          ConsumerStrategies.Assign[String, String](topicPartitions.keys.toList, kafkaParams, topicPartitions)
        )
    }
}

def setRedisHost: Unit ={
    for (host <- IP_RANG) {
      for (port <- PORT_RANG) {
        hosts.add(new HostAndPort("192.68.196." + host, port.toInt))
      }
    }
  }

       
作者 east
Hive 3月 2,2021

Hive开发建议

HQL编写之隐式类型转换

查询语句使用字段的值做过滤时,不建议通过Hive自身的隐式类型转换来编写HQL。因为隐式类型转换不利于代码的阅读和移植。

建议示例:

select * from default.tbl_src where id = 10001;
select * from default.tbl_src where name = 'TestName';

不建议示例:

select * from default.tbl_src where id = '10001';
select * from default.tbl_src where name = TestName;

说明:

表tbl_src的id字段为Int类型,name字段为String类型。

HQL编写之对象名称长度

HQL的对象名称,包括表名、字段名、视图名、索引名等,其长度建议不要超过30个字节。

Oracle中任何对象名称长度不允许超过30个字节,超过时会报错。PT为了兼容Oracle,对对象的名称进行了限制,不允许超过30个字节。

太长不利于阅读、维护、移植。

HQL编写之记录个数统计

统计某个表所有的记录个数,建议使用“select count(1) from table_name”。

统计某个表某个字段有效的记录个数,建议使用“select count(column_name) from table_name”。

JDBC超时限制

Hive提供的JDBC实现有超时限制,默认是5分钟,用户可以通过java.sql.DriverManager.setLoginTimeout(int seconds)设置,seconds的单位为秒。

UDF管理

建议由管理员创建永久UDF,避免每次使用时都去add jar,和重新定义UDF。

Hive的UDF会有一些默认属性,比如deterministic 默认为true(同一个输入会返回同一个结果),stateful(是否有状态,默认为true)。当用户实现的自定义UDF内部实现了汇总等,需要在类上加上相应的注解,比如如下类

@UDFType(deterministic = false)
Public class MyGenericUDAFEvaluator implements Closeable {

表分区优化建议

  1. 当数据量较大,且经常需要按天统计时,建议使用分区表,按天存放数据。
  2. 为了避免在插入动态分区数据的过程中,产生过多的小文件,在执行插入时,在分区字段上加上distribute by。

存储文件格式优化建议

Hive支持多种存储格式,比如TextFile,RCFile,ORC,Sequence,Parquet等。为了节省存储空间,或者大部分时间只查询其中的一部分字段时,可以在建表时使用列式存储(比如ORC文件)。

作者 east
Spark 3月 2,2021

Spark开发规范

规则

Spark应用中,需引入Spark的类

  • 对于Java开发语言,正确示例:
// 创建SparkContext时所需引入的类。
import org.apache.spark.api.java.JavaSparkContext
// RDD操作时引入的类。
import org.apache.spark.api.java.JavaRDD
// 创建SparkConf时引入的类。
import org.apache.spark.SparkConf
  • 对于Scala开发语言,正确示例:
// 创建SparkContext时所需引入的类。
import org.apache.spark.SparkContext
// RDD操作时引入的类。
import org.apache.spark.SparkContext._
// 创建SparkConf时引入的类。
import org.apache.spark.SparkConf

Java与Scala函数有区别,在编写应用时,需要弄清楚每个函数的功能

RDD是不可改变的,也就是说,RDD的元素对象是不能更改的,因此,在用Java和Scala编写需要弄清楚每个函数的功能。下面举个例子。

场景:现有用户位置数据,按照时间排序生成用户轨迹。在Scala中,按时间排序的代码如下:

/* 函数实现的功能是得到某个用户的位置轨迹。
 * 参数trajectory:由两部分组成-用户名和位置点(时间,经度,维度)
 */
private def getTimesOfOneUser(trajectory: (String, Seq[(String, Float, Float)]), zone: Zone, arrive: Boolean): Int =
{
// 先将用户位置点按时间排序
    val sorted: Seq[(String, Float, Float)] = trajectory._2.sortBy(x => x._1);
    …
}

若用java实现上述功能,则需要将trajectory._2重新生成对象,而不能直接对trajectory._2进行排序操作。原因是Collections.sort(trajectory._2)这个操作会改变了trajectory._2这个对象本身,这违背了RDD元素不可更改这条规则;而Scala代码之所以能够正常运行,是因为sortBy( )这个函数生成了一个新的对象,它并不对trajectory._2直接操作。下面分别列出java实现的正确示例和错误示例。

正确示例:

// 将用户的位置点从新生成一个对象。
List<Tuple3< String, Float, Float >> list = new ArrayList<Tuple3< String, Float, Float >>( trajectory._2);
// 对新对象进行排序。
Collections.sort(list);

错误示例:

// 直接对用户位置点按照时间排序。
Collections.sort(trajectory._2);

分布式模式下,应注意Driver和Executor之间的参数传递

在Spark编程时,总是有一些代码逻辑中需要根据输入参数来判断,这种时候往往会使用这种方式,将参数设置为全局变量,先给定一个空值(null),在main函数中,实例化SparkContext对象之前对这个变量赋值。然而,在分布式模式下,执行程序的jar包会被发送到每个Executor上执行。而该变量只在main函数的节点改变了,并未传给执行任务的函数中,因此Executor将会报空指针异常。

正确示例:

object Test
{
  private var testArg: String = null;
  def main(args: Array[String])
  {
    testArg = …;
    val sc: SparkContext = new SparkContext(…);

    sc.textFile(…)
    .map(x => testFun(x, testArg));
  }

  private def testFun(line: String, testArg: String): String =
  {
    testArg.split(…);
    return …; 
  }
}

错误示例:

//定义对象。
object Test
{
  // 定义全局变量,赋为空值(null);在main函数中,实例化SparkContext对象之前对这个变量赋值。
  private var testArg: String = null;
  // main函数
  def main(args: Array[String])
  {
    
    testArg = …;
    val sc: SparkContext = new SparkContext(…);

    sc.textFile(…)
      .map(x => testFun(x));
  }

  private def testFun(line: String): String =
  {
    testArg.split(...);
    return …; 
  }
}

运行错误示例,在Spark的local模式下能正常运行,而在分布式模式情况下,会在蓝色代码处报错,提示空指针异常,这是由于在分布式模式下,执行程序的jar包会被发送到每个Executor上执行,当执行到testFun函数时,需要从内存中取出testArg的值,但是testArg的值只在启动main函数的节点改变了,其他节点无法获取这些变化,因此它们从内存中取出的就是初始化这个变量时的值null,这就是空指针异常的原因。

应用程序结束之前必须调用SparkContext.stop

利用spark做二次开发时,当应用程序结束之前必须调用SparkContext.stop()。

说明:

利用Java语言开发时,应用程序结束之前必须调用JavaSparkContext.stop()。

利用Scala语言开发时,应用程序结束之前必须调用SparkContext.stop()。

以Scala语言开发应用程序为例,分别介绍下正确示例与错误示例。

正确示例:

//提交spark作业
val sc = new SparkContext(conf)

//具体的任务
...

//应用程序结束
sc.stop()

错误示例:

//提交spark作业
val sc = new SparkContext(conf)

//具体的任务
...

如果不添加SparkContext.stop,YARN界面会显示失败。同样的任务,前一个程序是没有添加SparkContext.stop,后一个程序添加了SparkContext.stop()。

作者 east
Flink 3月 1,2021

Flink Stream SQL Join程序

场景说明

假定某个Flink业务1每秒就会收到1条消息记录,消息记录某个用户的基本信息,包括名字、性别、年龄。另有一个Flink业务2会不定时收到1条消息记录,消息记录该用户的名字、职业信息。

基于某些业务要求,开发的Flink应用程序实现功能:实时的以根据业务2中消息记录的用户名字作为关键字,对两个业务数据进行联合查询。

数据规划

  • 业务1的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。Kafka配置参见样例数据规划章节。
  • 业务2的数据通过socket接收消息记录,可使用netcat命令用户输入模拟数据源。
    • 使用Linux命令netcat -l -p <port>,启动一个简易的文本服务器。
    • 启动应用程序连接netcat监听的port成功后,向netcat终端输入数据信息。

开发思路

  1. 启动Flink Kafka Producer应用向Kafka发送数据。
  2. 启动Flink Kafka Consumer应用从Kafka接收数据,构造Table1,保证topic与producer一致。
  3. 从soket中读取数据,构造Table2。
  4. 使用Flink SQL对Table1和Table2进行联合查询,并进行打印。

Java样例代码

功能介绍

在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。

代码样例

用户在开发前需要使用对接安全模式的FusionInsight Kafka,则需要引入FusionInsight的kafka-client-0.11.x.x.jar,该jar包可在FusionInsight client目录下获取。

下面列出producer和consumer,以及Flink Stream SQL Join使用主要逻辑代码作为演示。

完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafka和com.huawei.bigdata.flink.examples.SqlJoinWithSocket

Java样例代码

  1. 每秒钟往Kafka中生产一条用户信息,用户信息有姓名、年龄、性别组成。
//producer代码
public class WriteIntoKafka {

      public static void main(String[] args) throws Exception {

      // 打印出执行flink run的参考命令
        System.out.println("use command as: ");

        System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005");

        System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");

        System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");

        System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");

        System.out.println("******************************************************************************************");

        System.out.println("<topic> is the kafka topic name");

        System.out.println("<bootstrap.servers> is the ip:port list of brokers");

        System.out.println("******************************************************************************************");
       
        // 构造执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并发度
        env.setParallelism(1);
        // 解析运行参数
        ParameterTool paraTool = ParameterTool.fromArgs(args);
        // 构造流图,将自定义Source生成的数据写入Kafka
        DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());

        FlinkKafkaProducer010 producer = new FlinkKafkaProducer010<>(new FlinkKafkaProducer010<>(paraTool.get("topic"),

           new SimpleStringSchema(),

           paraTool.getProperties()));

        messageStream.addSink(producer);

        // 调用execute触发执行
        env.execute();
     }

// 自定义Source,每隔1s持续产生消息
public static class SimpleStringGenerator implements SourceFunction<String> {
        static final String[] NAME = {"Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James"};

        static final String[] SEX = {"MALE", "FEMALE"};

        static final int COUNT = NAME.length;   

        boolean running = true;

        Random rand = new Random(47);

       @Override
        //rand随机产生名字,性别,年龄的组合信息
         public void run(SourceContext<String> ctx) throws Exception {

            while (running) {

                int i = rand.nextInt(COUNT);

                int age = rand.nextInt(70);

                String sexy = SEX[rand.nextInt(2)];

                ctx.collect(NAME[i] + "," + age + "," + sexy);

                thread.sleep(1000);

            }

    }

       @Override

       public void cancel() {

         running = false;

       }

     }

   }

2.生成Table1和Table2,并使用Join对Table1和Table2进行联合查询,打印输出结果。

public class SqlJoinWithSocket {
    public static void main(String[] args) throws Exception{

        final String hostname;

        final int port;

        System.out.println("use command as: ");

        System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21005 --hostname xxx.xxx.xxx.xxx --port xxx");

        System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka"
                + "--hostname xxx.xxx.xxx.xxx --port xxx");

        System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks "
                + "--ssl.truststore.password huawei --hostname xxx.xxx.xxx.xxx --port xxx");

        System.out.println("******************************************************************************************");
        System.out.println("<topic> is the kafka topic name");
        System.out.println("<bootstrap.servers> is the ip:port list of brokers");
        System.out.println("******************************************************************************************");

        try {
            final ParameterTool params = ParameterTool.fromArgs(args);

            hostname = params.has("hostname") ? params.get("hostname") : "localhost";

            port = params.getInt("port");

        } catch (Exception e) {
            System.err.println("No port specified. Please run 'FlinkStreamSqlJoinExample " +
                    "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                    "and port is the address of the text server");

            System.err.println("To start a simple text server, run 'netcat -l -p <port>' and " +
                    "type the input text into the command line");

            return;
        }
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

        //基于EventTime进行处理
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        env.setParallelism(1);

        ParameterTool paraTool = ParameterTool.fromArgs(args);

        //Stream1,从Kafka中读取数据
        DataStream<Tuple3<String, String, String>> kafkaStream = env.addSource(new FlinkKafkaConsumer010<>(paraTool.get("topic"),
                new SimpleStringSchema(),
                paraTool.getProperties())).map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String s) throws Exception {
                String[] word = s.split(",");

                return new Tuple3<>(word[0], word[1], word[2]);
            }
        });

        //将Stream1注册为Table1
        tableEnv.registerDataStream("Table1", kafkaStream, "name, age, sexy, proctime.proctime");

        //Stream2,从Socket中读取数据
        DataStream<Tuple2<String, String>> socketStream = env.socketTextStream(hostname, port, "\n").
                map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        String[] words = s.split("\\s");
                        if (words.length < 2) {
                            return new Tuple2<>();
                        }

                        return new Tuple2<>(words[0], words[1]);
                    }
                });

        //将Stream2注册为Table2
        tableEnv.registerDataStream("Table2", socketStream, "name, job, proctime.proctime");

        //执行SQL Join进行联合查询
        Table result = tableEnv.sqlQuery("SELECT t1.name, t1.age, t1.sexy, t2.job, t2.proctime as shiptime\n" +
                "FROM Table1 AS t1\n" +
                "JOIN Table2 AS t2\n" +
                "ON t1.name = t2.name\n" +
                "AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL '1' SECOND");

        //将查询结果转换为Stream,并打印输出
        tableEnv.toAppendStream(result, Row.class).print();

        env.execute();
    }
}
作者 east

上一 1 … 59 60 61 … 92 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?
  • 如何进行AUTOSAR模块的持续集成(CI)部署与版本控制?

文章归档

  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (42)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (484)
    • CDH (6)
    • datax (4)
    • doris (28)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (9)
    • 运维 (33)
      • Docker (2)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.