gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

cnode社区版小程序代码

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 作者: east
  • ( 页面61 )
社交 3月 13,2021

cnode社区版小程序代码

通过本小程序,可以学习到如何用微信小程序开发一个论坛。在PC年代,论坛有很大流量,但在App时代,开始没之前受欢迎。用小程序实现论坛,可以分享到微信群来无缝结合。

var Api = require('../../utils/api.js');
var util = require('../../utils/util.js');

var navList = [
  {id:"all", title: "全部"},
  {id:"good", title: "精华"},
  {id:"share", title: "分享"},
  {id:"ask", title: "问答"},
  {id:"job", title: "招聘"}
];

Page({
  data: {
    activeIndex: 0,
    navList: navList,
    title: '话题列表',
    postsList: [],
    hidden: false,
    page: 1,
    limit: 20,
    tab: 'all'
  },

  onLoad: function () {
    this.getData();
  },

  onPullDownRefresh: function () {
    this.getData();
    console.log('下拉刷新', new Date());
  },

  
  onReachBottom: function () {
    this.lower();
    console.log('上拉刷新', new Date());
  },

  // 点击获取对应分类的数据
  onTapTag: function(e) {
    var that = this;
    var tab = e.currentTarget.id;
    var index = e.currentTarget.dataset.index;
    that.setData({
      activeIndex: index,
      tab: tab,
      page: 1
    });
    if (tab !== 'all') {
      that.getData({tab: tab});
    } else {
      that.getData();
    }
  },

  //获取文章列表数据
  getData: function() {
    var that = this;
    var tab = that.data.tab;
    var page = that.data.page;
    var limit = that.data.limit;
    var ApiUrl = Api.topics +'?tab='+ tab +'&page='+ page +'&limit='+ limit;

    that.setData({ hidden: false });

    if(page == 1) {
      that.setData({ postsList: [] });
    }

    Api.fetchGet(ApiUrl, (err, res) => {
      //更新数据
      that.setData({
        postsList: that.data.postsList.concat(res.data.map(function (item) {
          item.last_reply_at = util.getDateDiff(new Date(item.last_reply_at));
          return item;
        }))
      });

      setTimeout(function () {
        that.setData({ hidden: true });
      }, 300);
    })
  },

  // 滑动底部加载
  lower: function() {
    console.log('滑动底部加载', new Date());
    var that = this;
    that.setData({
      page: that.data.page + 1
    });
    if (that.data.tab !== 'all') {
      this.getData({tab: that.data.tab, page: that.data.page});
    } else {
      this.getData({page: that.data.page});
    }
  }


})

下载地址: 代码

作者 east
互联网类 3月 12,2021

微信小程序-干货集中营

本小程序,流式图文混排。可以学习到如何浏览图片,下载图片,提交表格,播放视频。

Page({
    data: {
        hidden: false,
        toastHidden: true,
        modalHidden: true,
        toastText: "数据无法正常显示,请将此问题上报管理员进行处理",
        loadingText: "加载中..."
    },

    onLoad: function (options) {
        that = this;
        if (options == null || options.publishTime == null || options.publishTime.split("-").length != 3) {
            this.setData({hidden: true, toastHidden: false});
            return;
        }

        requestData(options.publishTime.split("-"));
    },

    onImageClick: function (event) {
        this.setData({modalHidden: false})
    },

    onSaveClick: function (event) {
        saveIamge();
    },

    onCancelClick: function (event) {
        this.setData({modalHidden: true});
    },

    onToastChanged: function (event) {
        this.setData({toastHidden: true});
    }
});

var that;
var mIamgeUrl = "";
var mVideoUrl = "";

/**
 * 请求数据
 * @param that Page的对象,用其进行数据的更新
 */
function requestData(timeArray) {
    wx.request({
        url: Constant.BASE_URL + "/history/content/day/" + timeArray[0] + "/" + timeArray[1] + "/" + timeArray[2],
        header: {
            "Content-Type": "application/json"
        },
        success: function (res) {
            if (res == null ||
                res.data == null ||
                res.data.results == null ||
                res.data.results.length <= 0) {

                console.error(Constant.ERROR_DATA_IS_NULL);
                return;
            }
            parseHtml(res.data.results[0].content);
        }
    });
}

function saveIamge() {
    that.setData({
        hidden: false,
        toastHidden: true,
        modalHidden: true,
        loadingText: "下载中..."
    });
    wx.downloadFile({
        url: mIamgeUrl,
        type: 'image',
        success: function (res) {
            console.log("download success");
            that.setData({
                hidden: true,
                toastHidden: false,
                toastText: "图片已成功下载"
            });
        },
        fail: function (res) {
            console.log("download fail");
            that.setData({
                hidden: true,
                toastHidden: false,
                toastText: "下载失败,请重试"
            });
        },
    })
}

/**
 * 解析html块。
 * 这里应该还有挺大的优化空间,但是我对js确实不熟悉,只能用这种笨方法了
 * @param htmlBlock
 */
function parseHtml(htmlBlock) {

    //这边图片的URL解析同main.js中
    var re = new RegExp("[a-zA-z]+://[^\"]*");
    var title = htmlBlock.split("img alt=")[1].match(re)[0];

    if(-1 != title.search("//ww")){
        mIamgeUrl = title.replace("//ww","//ws");
    }else{
        mIamgeUrl = title;
    }

    var tags = [];
    var items = [];

    var doc = Util.parseHtml(htmlBlock);
    var tagElements = doc.getElementsByTagName("ul");
    console.log(doc);
    console.log(tagElements);
    var i = 0;
    for (; i < tagElements.length; i++) {
        var value = tagElements[i];
        if (value.innerText.trim().length == 0) {
            continue;
        }
        var valueChildren = value.children;
        var j = 0;
        var singleItems = [];
        for (; j < valueChildren.length; j++) {
            var singleItem = [];
            singleItem.push(valueChildren[j].innerText.trim());
            singleItem.push(valueChildren[j].children[0].href.trim());
            singleItems.push(singleItem);
        }
        items.push(singleItems);
    }

    var h3s = doc.getElementsByTagName("h3");
    var i = 0;
    for (; i < h3s.length; i++) {
        tags.push(h3s[i].innerText);
    }

    if (tags.length != items.length) {
        console.log("not right");
    }

    var finalData = [];

    var i = 0;
    for (; i < tags.length; i++) {
        var node = [];
        node.push(tags[i]);
        node.push(items[i]);
        finalData.push(node);
    }

    //将获得的各种数据写入itemList,用于setData
    var itemList = [];
    for (var i = 0; i < tags.length; i++) {
        var singleItemList = [];
        for (var j = 0; j < items[i].length; j++) {
            singleItemList.push({title: items[i][j][0], url: items[i][j][1]});
            if (i == tags.length - 1) {
                mVideoUrl = items[i][j][1];
            }
        }
        itemList.push({tag: tags[i], singleItems: singleItemList});
    }

    that.setData({
        data: itemList,
        imageUrl: mIamgeUrl,
        videoUrl: mVideoUrl,
        hidden: true
    });
    console.log(finalData);
}

var Constant = require('../../utils/constant.js');
var Util = require('../../utils/util.js');

下载地址:工程代码

作者 east
UI控件 3月 12,2021

微信小程序Flex布局例子

Flex布局是W3C组织在2009年提出的一个新的布局方案,其宗旨是让页面的样式布局更加简单,并且可以很好地支持响应式布局。这并不是小程序所独有的技术,它本身是CSS语法的一部分。只不过早期时候,主流的浏览器对Flex布局的支持并不完善,造成了很多开发者不知道有这种布局的存在或者使用非常少,我们还是习惯使用传统的position和float属性来布局。但传统的布局方式有它的缺陷,比如像垂直居中就不是那么容易实现,Flex可以很好地解决这些问题。小程序能够非常好地支持Flex布局,并且这也是官方推荐的布局方式。

Flex也称为“弹性布局”,主要作用在容器上,比如view组件,就是一个容器,它将页面中所有的元素都包裹起来。我们使用display:flex将这个view变成了一个弹性盒子。设置display:flex是应用一切弹性布局属性的先决条件,如果不设置display:flex,那么后续的其他相关弹性布局属性都将无效。接着我们使用flex-direction这个属性指定view内元素的排列方向。这个属性可能的值有4个:

• row

• column

• row-reverse

• column-reverse

要理解这4个属性,首先要了解一个Flex布局非常重要的概念:轴。我们知道,在一个平面直角坐标系里,轴有两个方向(就是X、Y),分别是水平方向和垂直方向。一个弹性盒子,需要确定一个主轴。这个主轴到底是水平方向还是垂直方向,就由flex-direction这个属性的值来确定。如果flex-direction值为row或者row-reverse,那么主轴的方向为水平方向,相反,如果值为column或者column-reverse,那么主轴为垂直方向。选定主轴的方向后,另外一个方向的轴我们成为“交叉轴”。也就是说,主轴并不一定就是水平方向,交叉轴也并不一定就是垂直方向,主轴的方向由flex-direction的取值来决定。理解这一点尤其重要。我们来看图3-8~图3-11所示。

图3-8到图3-11显示了当flex-direction取不同值时,主轴方向及子元素排布的情况。注意观察每张图里3个小item的排布顺序,主轴方向不同,子元素排布的方向也不同。

• flex-direction:row时,主轴水平,方向为自左向右

• flex-direction:row-reverse时,主轴水平,但方向为自右向左?

flex-direction:column时,主轴垂直,方向自上而下

• flex-direction:column-reverse时,主轴垂直,方向自下而上

例子图是这样的:

<view class="menu">
    <scroll-view scroll-x="true" class="top-nav">
        <view bindtap="toHot" class="top-btn {{hot}}">热门</view>
        <view bindtap="toFilm" class="top-btn {{film}}">电影</view>
        <view bindtap="toReadBook" class="top-btn {{book}}">读书</view>
        <view bindtap="toTelv" class="top-btn {{telv}}">电视</view>
        <view bindtap="toActive" class="top-btn {{active}}">活动</view>
        <view bindtap="toMusic" class="top-btn {{music}}">音乐</view>
        <view bindtap="toOther" class="top-btn {{other}}">其他</view>
        <view bindtap="toOther1" class="top-btn {{other1}}">其他1</view>
    </scroll-view>
</view>
<scroll-view scroll-y="true" class="content">
    <view class="img-poster">
        <image mode="scaleToFill" class="actual-img" src="https://qnmob.doubanio.com/img/files/file-1475046357.jpg?imageView2/2/q/60/w/600/h/0/format/jpg"></image>
    </view>
    <view class="col3">
         <view class="col3-h">
            <text>影院热映</text>
            <icon type="search" size="16" style="float:right"></icon>
        </view>
        <scroll-view scroll-x="true" class="col3-img-list">
            <view class="col3-img">
                <image class="c3-actual-img" mode="scaleToFill" src="https://qnmob.doubanio.com/view/movie_poster_cover/lpst/public/p2382076389.jpg?imageView2/0/q/80/w/9999/h/300/format/jpg"></image>
            </view>
            <view class="col3-img">
                <image class="c3-actual-img" mode="scaleToFill" src="https://qnmob.doubanio.com/view/movie_poster_cover/lpst/public/p2377470803.jpg?imageView2/0/q/80/w/9999/h/300/format/jpg"></image>
            </view>
            <view class="col3-img">
                <image class="c3-actual-img" mode="scaleToFill" src="https://qnmob.doubanio.com/view/movie_poster_cover/lpst/public/p2380677316.jpg?imageView2/0/q/80/w/9999/h/300/format/jpg"></image>
            </view>
            <view class="col3-img">
                <image class="c3-actual-img" mode="scaleToFill" src="https://qnmob.doubanio.com/view/movie_poster_cover/lpst/public/p2380681527.jpg?imageView2/0/q/80/w/9999/h/300/format/jpg"></image>
            </view>
        </scroll-view>
    </view>
    <view class="col3">
         <view class="col3-h">
            <text>免费在线观看新篇</text>
            <icon type="search" size="16" style="float:right"></icon>
        </view>
        <scroll-view scroll-x="true" class="col3-img-list">
            <view class="col3-img">
                <image class="c3-actual-img" mode="scaleToFill" src="https://qnmob.doubanio.com/view/movie_poster_cover/lpst/public/p2351313845.jpg?imageView2/0/q/80/w/9999/h/300/format/jpg"></image>
            </view>
            <view class="col3-img">
                <image class="c3-actual-img" mode="scaleToFill" src="https://qnmob.doubanio.com/view/movie_poster_cover/lpst/public/p2324130130.jpg?imageView2/0/q/80/w/9999/h/300/format/jpg"></image>
            </view>
            <view class="col3-img">
                <image class="c3-actual-img" mode="scaleToFill" src="https://qnmob.doubanio.com/view/movie_poster_cover/lpst/public/p2332944143.jpg?imageView2/0/q/80/w/9999/h/300/format/jpg"></image>
            </view>
            <view class="col3-img">
                <image class="c3-actual-img" mode="scaleToFill" src="https://qnmob.doubanio.com/view/movie_poster_cover/lpst/public/p2367899630.jpg?imageView2/0/q/80/w/9999/h/300/format/jpg"></image>
            </view>
            <view class="col3-img">
                <image class="c3-actual-img" mode="scaleToFill" src="https://qnmob.doubanio.com/view/movie_poster_cover/lpst/public/p2346212678.jpg?imageView2/0/q/80/w/9999/h/300/format/jpg"></image>
            </view>
        </scroll-view>
    </view>
    <view class="col3">
         <view class="col3-h">
            <text>新篇速递</text>
            <icon type="search" size="16" style="float:right"></icon>
        </view>
        <scroll-view scroll-x="true" class="col3-img-list">
            <view class="col3-img">
                <image class="c3-actual-img" mode="scaleToFill" src="https://qnmob.doubanio.com/view/movie_poster_cover/lpst/public/p2355441956.jpg?imageView2/0/q/80/w/9999/h/300/format/jpg"></image>
            </view>
            <view class="col3-img">
                <image class="c3-actual-img" mode="scaleToFill" src="https://qnmob.doubanio.com/view/movie_poster_cover/lpst/public/p2318975149.jpg?imageView2/0/q/80/w/9999/h/300/format/jpg"></image>
            </view>
            <view class="col3-img">
                <image class="c3-actual-img" mode="scaleToFill" src="https://qnmob.doubanio.com/view/movie_poster_cover/lpst/public/p2371503931.jpg?imageView2/0/q/80/w/9999/h/300/format/jpg"></image>
            </view>
            <view class="col3-img">
                <image class="c3-actual-img" mode="scaleToFill" src="https://qnmob.doubanio.com/view/movie_poster_cover/lpst/public/p2322079398.jpg?imageView2/0/q/80/w/9999/h/300/format/jpg"></image>
            </view>
        </scroll-view>
    </view>
    <view class="nav">
        <button type="primary" class="tag" size="mini">经典</button>
        <button type="primary" class="tag" size="mini">动画</button>
        <button type="primary" class="tag" size="mini">冷门佳片</button>
        <button type="primary" class="tag" size="mini">豆瓣高分</button>
        <button type="primary" class="tag" size="mini">动作</button>
        <button type="primary" class="tag" size="mini">喜剧</button>
        <button type="primary" class="tag" size="mini">悬疑</button>
        <button type="primary" class="tag" size="mini">文艺</button>
        <button type="primary" class="tag" size="mini">治愈</button>
        <button type="primary" class="tag" size="mini">科幻</button>
        <button type="primary" class="tag" size="mini">成长</button>
        <button type="primary" class="tag" size="mini">华语</button>
        <button type="primary" class="tag" size="mini">韩国</button>
        <button type="primary" class="tag" size="mini">日本</button>
    </view>
</scroll-view>

下载地址:http://www.hui1111.cn/minicode/wechat-app-flexlayout-master.rar

作者 east
多媒体 3月 11,2021

仿苹果音乐播放器

功能有我的音乐、为你推荐、浏览、广播、搜索等功能。

通过本例子,可以学习到全局变量使用。首先对比一下页面中的共享变量是如何设置的。页面的共享变量被设置在页面Page方法的object对象上,比如data就是object对象的一个属性。所以,我们在其他方法中才能够多次使用this.data的方式引用这个data对象。页面的共享变量应该在页面中设置,所以全局共享变量自然应该在应用程序级别设置。小程序提供了一个全局方法getApp(),用于获取小程序的App对象。

var pageObject = {
  data: {
    playing:false,
    playingSongsNum:0,
    musicGroupName:items[0],
    listTemplateName:'music-play-list',
    actionSheetHidden: true,
    actionSheetItems: items,
    playBar:{
      dataUrl:'http://stream.qqmusic.tc.qq.com/137192078.mp3',
      name: '告白气球',
      singer:'周杰伦',
      coverImgUrl: 'http://y.gtimg.cn/music/photo_new/T002R90x90M000003RMaRI1iFoYd.jpg'
    },
    songsList:_songsList,
    albumList :_albumList
  },
  playButtonTap:function(){
    var that = this

  },
  actionSheetTap: function(e) {
    this.setData({
      actionSheetHidden: !this.data.actionSheetHidden
    })
  },
  actionSheetChange: function(e) {
    this.setData({
      actionSheetHidden: !this.data.actionSheetHidden
    })
  },
  onLoad: function () {
    var that = this
    wx.onBackgroundAudioStop(function () {
      that.setData({
        playing: false
      })
    })
  },
  play: function (event) {
    var that = this
    var res=that.data.songsList[event.currentTarget.dataset.num]
    getApp().globalData.playing = res
    that.setData({
          playBar:res,
          playingSongsNum:event.currentTarget.dataset.num
    })
    wx.playBackgroundAudio({
      dataUrl: res.dataUrl,
      name: res.name,
      singer:res.singer,
      coverImgUrl: res.coverImgUrl,
      complete: function (res) {
        that.setData({
          playing: true
        })
      }
    })
  },
  pause: function () {
    var that = this
    wx.pauseBackgroundAudio({
      success: function () {
        that.setData({
          playing: false
        })
      }
    })
  },
  onUnload: function () {
    clearInterval(this.updateInterval)
  },
  onShow:function(){
    var that = this
    
      wx.request({
        url: 'http://120.27.93.97/weappserver/get_music.php',
        data: {
          mid: getApp().globalData.playing.mid
        },
        header: {
            'Content-Type': 'text/html;charset=utf-8'
        },
        success: function(res) {
          console.log(res.data)
          var obj=that.data.playBar
          obj['coverImgUrl']='http:'+res.data
          that.setData({
            playBar:obj
          })
        }
      })
      that.setData({
        playing: true,
        playBar: getApp().globalData.playing
      })
  }
}

 for (var i = 0; i < items.length; ++i) {
   (function(itemName) {
     switch(itemName){
       case '播放列表':
          pageObject['bind' + itemName] = function(e) {
          console.log('click' + itemName, e)
          this.setData({
            musicGroupName:itemName,
            listTemplateName:'music-play-list',
            templateData:null,
            actionSheetHidden: !this.data.actionSheetHidden
          })
        }
       break;

       case '歌曲':
          pageObject['bind' + itemName] = function(e) {
          console.log('click' + itemName, e)
          this.setData({
            musicGroupName:itemName,
            listTemplateName:'songs-list',
            templateData:_songsList,
            actionSheetHidden: !this.data.actionSheetHidden
          })
        }
       break;

       case '专辑':
          pageObject['bind' + itemName] = function(e) {
          console.log('click' + itemName, e)
          this.setData({
            musicGroupName:itemName,
            listTemplateName:'album-list',
            templateData:_albumList,
            actionSheetHidden: !this.data.actionSheetHidden
          })
        }
       break;

       case '演唱者':
          pageObject['bind' + itemName] = function(e) {
          console.log('click' + itemName, e)
          this.setData({
            musicGroupName:itemName,
            listTemplateName:'singer-list',
            templateData:null,
            actionSheetHidden: !this.data.actionSheetHidden
          })
        }
       break;
     }

   })(items[i])
 }

下载地址:代码

作者 east
Java 3月 11,2021

使用Springboot @Value配置时遇到几个不生效的问题

在开发项目时,把一些可能变化的东西,尽量搞成配置文件。这样以后有变化时,改一下配置就可以,不用开发人员重新编译。

使用Springboot的@Value, 常规配置方法是这样:

@Compent
public class TestA{

@Value("${MY_URL}")
private String myUrl;
}

在开发当中,如果按上面方式,遇到下面情况会不生效:

1、静态变量

不能像常规那样使用,要使用set方法,例如:

@Compent
public class TestA{
private static String myUrl;

@Value("${MY_URL}")
public void setMyUrl(String url){
myUrl = url;
}
}

2、构造函数

@Compent
public class TestA{
 public TestA(@Value("${MY_URL}") String myUrl){
 }
}
作者 east
Java 3月 11,2021

Properties获取配置的数组

JDK自带的Properties类没有Springboot用Value配置参数那么方便,尤其是数组, Properties 没有相应的方法。但可以用变通的方式。就是把数组配置成字符串,用特殊符号分隔。

InputStream in = RunTest.class.getClassLoader().getResourceAsStream("application.properties");
        try {
            properties.load(in);

            String redisListString = properties.getProperty("redisList");
String[] arr = redisListString.split(",");
}catch(Exception ex){
}

application.properties的配置如下:

redisList=192.68.1.2:22409,192.68.1.3:22532

作者 east
Hbase 3月 10,2021

Hbase统计海量数据行数

业务需要统计每天hbase的数据量,而且每天增量有上百万条。

网上找了些代码,很多推荐使用Coprocessor的方式,执行效率高。但在我的大数据环境 运行出错,报“No registered coprocessor service found for name AggregateService in region xxx”。后来发现是第一次运行时需要下面这些代码来修改配置

 String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
        if (! descriptor.hasCoprocessor(coprocessorClass)) {
            descriptor.addCoprocessor(coprocessorClass);
        }
        admin.modifyTable(name, descriptor);
        admin.enableTable(name);
public void rowCount(String tablename){
    try {
        //提前创建connection和conf
        Admin admin = connection.getAdmin();
        TableName name=TableName.valueOf(tablename);
        //先disable表,添加协处理器后再enable表
        admin.disableTable(name);
        HTableDescriptor descriptor = admin.getTableDescriptor(name);
        String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
        if (! descriptor.hasCoprocessor(coprocessorClass)) {
            descriptor.addCoprocessor(coprocessorClass);
        }
        admin.modifyTable(name, descriptor);
        admin.enableTable(name);

        //计时
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        Scan scan = new Scan();
        AggregationClient aggregationClient = new AggregationClient(conf);

        System.out.println("RowCount: " + aggregationClient.rowCount(name, new LongColumnInterpreter(), scan));
        stopWatch.stop();
        System.out.println("统计耗时:" +stopWatch.getTotalTimeMillis());
    } catch (Throwable e) {
        e.printStackTrace();
    }
}
作者 east
程序员网赚 3月 7,2021

在4414发帖赚每月手机费(亲测有效)

生活不容易,水费、电费、手机费、交通费。。。虽然每一项不多,但加起来却不少。工资固定,开源节流就显得很重要。

如果你有一个网站或服务器,那恭喜你,可以在4414发发贴,每个月轻轻松松个电话费。(4414要验证网站)

如上图:每天可以回帖3次,总共赚1.5元,发帖1次赚0.5元,签到1次0.1元+。其中回帖基本像浏览新闻,基本不用动脑就可以回复评论的。

注册地址:4414网站

提现的话,需要点击右上角的“个人中心”然后在左边那个栏目中找到“我的积分”将你获得的金币安装10000:1的比例兑换成余额就可以在“财务中心”进行提现

最少提现1 元 ,请在每周五上午之前申请提现,所有提现每周五下午审核支付,每个支付宝只能绑定一个账号,重复绑定的账号封号处理 。如果有多个网站,就需要有多个支付宝账号来体现,感觉这个来钱远比集五福等任务简单。

最近梅州提现1次,已经体现了2次,赚了2杯奶茶的钱了。

作者 east
工具类 3月 6,2021

微信小程序上传多个图片源码

开发小程序,如果 除了图片外,还有其他几个文本表单,想着一起上传,微信小程序只能先把图片给传上去,然后再提交表单了。如果上传多张图片,是采用循环上传图片的方式。

const uploadFile = (filePath, clear = false, fullPath = false) => {
  return new Promise((resolve, reject) => {
    getEnv().then((res) => {
      if (!filePath || filePath.length < 9) {
        wx.showModal({
          title: '图片错误',
          content: '请重试',
          showCancel: false,
        })
        return;
      }
      const fileType = '.' + filePath.split('.')[filePath.split('.').length - 1]
      const aliyunFileKey = res.dir + new Date().getTime() + Math.floor(Math.random() * 150) + fileType;
      const aliyunServerURL = res.host;
      const accessid = res.accessId;
      wx.uploadFile({
        url: aliyunServerURL, //开发者服务器 url
        filePath: filePath, //要上传文件资源的路径
        name: 'file', //必须填file
        formData: {
          'key': aliyunFileKey,
          'policy': res.policy,
          'OSSAccessKeyId': accessid,
          'signature': res.signature,
          'success_action_status': '200',
        },
        success: function (res) {
          if (res.statusCode != 200) {
            reject();
          }
          if (clear) {
            clearEnv()
          }
          if (fullPath)
            resolve(aliyunServerURL + '/' + aliyunFileKey) // 全路径上传
          else
            resolve(aliyunFileKey) // 残路径上传
        },
        fail: function (err) {
          err.wxaddinfo = aliyunServerURL;
          failc(err);
        },
      })
    })

  })

}
// 上传图片
  upload(imageList, imageSrcList = []) {
    let that = this
    if (imageSrcList == []) {
      imageList.forEach(() => {
        imageSrcList.push([])
      })
    }
    let clear = (times == imageList.length - 1)
    uploadImage(imageList[times], clear).then((result) => {
      wx.hideLoading();
      times++
      wx.showLoading({
        title: times + '/' + imageList.length
      })
      imageSrcList[times - 1] = result
      if (times == imageList.length) {
        times = 0
        that.addPhoto(imageSrcList)
      } else {
        that.upload(imageList, imageSrcList)
      }
    })
  },
作者 east
php 3月 4,2021

解决wordpress密码正确后台登录不上

登录wordpress后台,密码正确,但是登录不上。网上找了一些方法:

修改文件“/wp-includes/pluggable.php”中的wp_set_auth_cookie函数。搜索此行代码(代码较长,分为两行):

1setcookie($auth_cookie_name,$auth_cookie,$expire,
2ADMIN_COOKIE_PATH,COOKIE_DOMAIN,$secure,true);

  将上面代码中的 ADMIN_COOKIE_PATH 改为 SITECOOKIEPATH ,当然,你会搜出两行,博主测试的结果是修改第一行就可以了,完整修改如下:

1setcookie($auth_cookie_name,$auth_cookie,$expire,
2SITECOOKIEPATH,COOKIE_DOMAIN,$secure,true);

发现还是登录不了。清缓存,换浏览器,重启php和mysql后,还是登录不上。

用df -h 看磁盘空间,发现也还没满。

用网上方法, 登录后直接访问WordPress站点地址(URL)/wp-admin/index.php ,发现也是登录不上。

然后想起最近装了一些插件,把插件改文件名称,也是无效。最后看到看日志,查了mysql的错误日志,发现下面日志:

InnoDB: mysqld and edit my.cnf so that newraw is replaced
InnoDB: with raw, and innodb_force_... is removed.
InnoDB: A new raw disk partition was initialized or
InnoDB: innodb_force_recovery is on: we do not allow
InnoDB: database modifications by the user. Shut down
InnoDB: mysqld and edit my.cnf so that newraw is replaced
InnoDB: with raw, and innodb_force_... is removed.

想到最近mysql崩溃了,在my.cnf的innodb_force_recovery的值被我强制改成1后恢复了,想回不会是这个影响,把
innodb_force_recovery 的值又改为0。重启mysql后登录wordpress,果然正常了。

总结:网上的方法并不是简单照搬就能解决问题,要分析最近操作了什么,看日志,对症下药。

作者 east
Spark 3月 3,2021

Spark Streaming调优实践

当我们将应用部署在集群上时,可能会碰到运行慢、占用过多资源、不稳定等问题,这时需要做一些优化才能达到最好的性能。有时候一个简单的优化可以起到化腐朽为神奇的作用,使得程序能够更加有效率,也更加节省资源。本章我们就来介绍一些能够提高应用性能的参数和配置。另外需要指出的是,优化本身是一个具体性很强的事情,不同的应用及落地场景会有不同的优化方式,并没有一个统一的优化标准。本章我们将一些常用的和在项目中踩过的“坑”总结一下,列举以下常见的优化方式。

数据序列化在分布式应用中,序列化(serialization)对性能的影响是显著的。如果使用一种对象序列化慢、占用字节多的序列化格式,就会严重降低计算效率。通常在Spark中,主要有如下3个方面涉及序列化:

● 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。

● 将自定义的类型作为RDD的泛型类型时,所有自定义类型对象都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。

● 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER), Spark会将RDD中的每个partition都序列化成一个大的字节数组。而Spark综合考量易用性和性能,提供了下面两种序列化库。

● Java序列化:默认情况下,Spark使用Java的对象输出流框架(ObjectOutputStream framework)来进行对象的序列化,并且可用在任意实现Java.io.Serializable接口的自定义类上。我们可以通过扩展Java.io.Externalizable来更加精细地控制序列化行为。Java序列化方式非常灵活,但是通常序列化速度非常慢而且对于很多类会产生非常巨大的序列化结果。

● Kryo序列化:Spark在2.0.0以上的版本可以使用Kryo库来非常快速地进行对象序列化,Kryo要比Java序列化更快、更紧凑(10倍),但是其不支持所有的Serializable类型,并且在使用自定义类之前必须先注册。

我们可以在初始化SparkConf时,调用conf.set(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”)来使用Kryo。一旦进行了这个配置,Kryo序列化不仅仅会用在Shuffling操作时worker节点间的数据传递,也会用在RDDs序列化到硬盘的过程。Spark官方解释没有将Kryo作为默认序列化方式的唯一原因是,Kryo必须用户自己注册(注意如果我们不注册自定义类,Kryo也是可以正常运行的,但是它必须存储每个对象的完整类名,这是非常浪费的),但是其推荐在网络频繁传输的应用中使用Kryo。另外值得注意的是,在Spark 2.0.0之后,Spark已经默认将Kryo序列化作为简单类型(基本类型、基本类型的数组及string类型)RDD进行Shuffling操作时传输数据的对象序列化方式。Spark已经自动包含注册了绝大部分Scala的核心类,如果需要向Kryo注册自己的类别,可以使用registerKryoClasses方法。使用Kryo的代码框架如下:

如果我们的对象非常大,可能需要增加Spark.kryoserializer.buffer的配置。同样在Spark Streaming中,通过优化序列化格式可以缩减数据序列化的开销,而在Streaming中还会涉及以下两类数据的序列化。

● 输入数据:在4.4.1节中曾讲过,Spark Streaming中不同于RDD默认是以非序列化的形式存于内存当中,Streaming中由接收器(Receiver)接收而来的数据,默认是以序列化重复形式(StorageLevel.MEMORY_AND_DISK_SER_2)存放于Executor的内存当中。而采用这种方式的目的,一方面是由于将输入数据序列化为字节流可以减少垃圾回收(GC)的开销,另一方面对数据的重复可以对Executor节点的失败有更好的容错性。同时需要注意的是,输入数据流一开始是保存在内存当中,当内存不足以存放流式计算依赖的输入数据时,会自动存放于硬盘当中。而在Streaming中这部分序列化是一个很大的开销,接收器必须先反序列化(deserialize)接收到的数据,然后再序列化(serialize)为Spark本身的序列化格式。

● 由Streaming操作产生RDD的持久化:由流式计算产生的RDDs有可能持久化在内存当中,例如由于基于窗口操作的数据会被反复使用,所以会持久化在内存当中。值得注意的是,不同于Spark核心默认使用非序列化的持久化方式(StorageLevel. MEMORY_ONLY),流式计算为了减少垃圾回收(GC)的开销,默认使用了序列化的持久化方式(StorageLevel.MEMORY_ONLY_SER)。

不管在Spark还是在Spark Streaming中,使用Kryo序列化方式,都可以减少CPU和内存的开销。而对于流式计算,如果数据量不是很大,并且不会造成过大的垃圾回收(GC)开销,我们可以考虑利用非序列化对象进行持久化。

例如,我们使用很小的批处理时间间隔,并且没有基于窗口的操作,可以通过显示设置相应的存储级别来关闭持久化数据时的序列化,这样可以减少序列化引起的CPU开销,但是潜在的增加了GC的开销。

2.广播大变量

我们可以看出,不论Spark还是Spark Streaming的应用,在集群节点间进行数据传输时,都会有序列化和反序列化的开销,而如果我们的应用有非常大的对象时,这部分开销是巨大的。比如应用中的任何子任务需要使用Driver节点的一个大型配置查询表,这时就可以考虑将该表通过共享变量的方式,广播到每一个子节点,从而大大减少在传输和序列化上的开销。另外,Spark在Master节点会打印每个任务的序列化对象大小,我们可以通过观察任务的大小,考虑是否需要广播某些大变量。通常一个任务的大小超过20KB,是值得去优化的。当我们将大型的配置查询表广播出去时,每个节点可以读取配置项进行任务计算,那么假设配置发生了动态改变时,如何通知各个子节点配置表更改了呢?(尤其是对于流式计算的任务,重启服务代价还是蛮大的。)

广播变量是只读的,也就是说广播出去的变量没法再修改,那么应该怎么解决这个问题呢?我们可以利用Spark中的unpersist()函数,Spark通常会按照LRU(leastRecently Used)即最近最久未使用原则对老数据进行删除,我们并不需要操作具体的数据,但如果是手动删除,可以使用unpersist()函数。

3.数据处理和接收时的并行度

作为分布式系统,增加接收和处理数据的并行度是提高整个系统性能的关键,也能够充分发挥集群机器资源。关于partition和parallelism。partition指的就是数据分片的数量,每一次Task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多Executor的计算能力无法充分利用;但是如果partition太大了则会导致分片太多,执行效率降低。在执行Action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及Shuffle,因此这个parallelism的参数没有影响)。由上述可得,partition和parallelism这两个概念密切相关,都是涉及数据分片,作用方式其实是统一的。通过Spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量,如reduceByKey和reduceByKeyAndWindow。

Spark Streaming接收Kafka数据的方式,这个过程有一个数据反序列化并存储到Spark的开销,如果数据接收成为了整个系统的瓶颈,那么可以考虑增加数据接收的并行度。每个输入DStream会创建一个单一的接收器(receiver在worker节点运行)用来接收一个单一的数据流。而对于接收多重数据的情况,可以创建多个输入DStream用来接收源数据流的不同分支(partitions)。如果我们利用Receiver的形式接收Kafka,一个单一的Kafka输入DStream接收了两个不同topic的数据流,我们为了提高并行度可以创建两个输入流,分别接收其中一个topic上的数据。这样就可以创建两个接收器来并行地接收数据,从而提高整体的吞吐量。而之后对于多个DStreams,可以通过union操作并为一个DStream,之后便可以在这个统一的输入DStream上进行操作,代码示例如下:

如果采用Direct连接方式,前面讲过Spark中的partition和Kafka中的partition是一一对应的,但一般默认设置为Kafka中partition的数量,这样来达到足够并行度以接收Kafka数据。

4.设置合理的批处理间隔

对于一个Spark Streaming应用,只有系统处理数据的速度能够赶上数据接收的速度,整个系统才能保持稳定,否则就会造成数据积压。换句话说,即每个batch的数据一旦生成就需要被尽快处理完毕。这一点我们可以通过Spark监控界面进行查看,比较批处理时间必须小于批处理间隔。通过设置合理的批处理大小(batch size),使得每批数据能够在接收后被尽快地处理完成(即数据处理的速度赶上数据生成的速度)。如何选取合适的批处理时间呢?一个好的方法是:先保守地设置一个较大的批处理间隔(如5~10s),以及一个很低的数据速率,来观测系统是否能够赶上数据传输速率。我们可以通过查看每个处理好的batch的端到端延迟来观察,也可以看全局延迟来观察(可以在Spark log4j的日志里或者使用StreamingListener接口,也可以直接在UI界面查看)。如果延迟保持在一个相对稳定的状态,则整个系统是稳定的,否则延迟不断上升,那说明整个系统是不稳定的。在实际场景中,也可以直接观察系统正在运行的Spark监控界面来判断系统的稳定性。

5. 内存优化内存

优化是在所有应用落地中必须经历的话题,虽然Spark在内存方面已经为开发者做了很多优化和默认设置,但是我们还是需要针对具体的情况进行调试。在优化内存的过程中需要从3个方面考虑这个问题:对象本身需要的内存;访问这些对象的内存开销;垃圾回收(GC garbagecollection)导致的开销。通常来说,对于Java对象而言,有很快的访问速度,但是很容易消耗原始数据2~5倍以上的内存空间,可以归结为以下几点原因:

● 每个独立的Java对象,都会有一个“对象头”,大约16个字节用来保存一些基本信息,如指向类的指针,对于一个只包含很少数据量在内的对象(如一个Int类型数据),这个开销是相对巨大的。

● Java的String对象会在原始数据的基础上额外开销40个字节,因为除了字符数组(Chars array)本身之外,还需要保存如字符串长度等额外信息,而且由于String内部存储字符时是按照UTF-16格式编码的,所以一个10字符的字符串开销很容易超过60个字符。

● 对于集合类(collection classes),如HashMap、LinkedList,通常使用链表的形式将数据结构链在一起,那么对于每一个节点(entry,如Map.Entry)都会有一个包装器(wrapper),而这个包装器对象不仅包含对象头,还会保存指向下一个节点的指针(每个8字节)。

● 熟悉Java的开发者应该知道,Java数据类型分为基本类型和包装类型,对于int、long等基本类型是直接在栈中分配空间,如果我们想将这些类型用在集合类中(如Map),需要使用对基本数据类型打包(当然这是Java的一个自动过程),而打包后的基本数据类型就会产生额外的开销。针对以上内存优化的基本问题,接下来首先介绍Spark中如何管理内存,之后介绍一些能够在具体应用中更加有效地使用内存的具体策略,例如,如何确定合适的内存级别,如何改变数据结构或将数据存储为序列化格式来节省内存等,也会从Spark的缓存及Java的垃圾回收方面进行分析,另外,也会对SparkStreaming进行分析。

5.1 内存管理

Spark对于内存的使用主要有两类用途:执行(execution)和存储(storage)。执行类内存主要被用于Shuffle类操作、join操作及排序(sort)和聚合(aggregation)类操作,而存储类内存主要用于缓存数据(caching)和集群间内部数据的传送。在Spark内部执行和存储分享同一片内存空间(M),当没有执行类内存被使用时,存储类内存可以使用全部的内存空间,反之亦然。执行类内存可以剥夺存储类内存的空间,但是有一个前提是,存储类内存所占空间不得低于某一个阈值R,也就是说R指定了M中的一块子空间块是永远不会被剥夺的。而另一方面由于实现上的复杂性,存储类内存是不可以剥夺执行类内存的。Spark的这种设计方式确保了系统一些很好的特性:首先,如果应用不需要缓存数据,那么所有的空间都可以用作执行类内存,可以一定程度上避免不必要的内存不够用时溢出到硬盘的情况;其次,如果应用需要使用缓存数据,会有最小的内存空间R能够保证这部分数据块免于被剥夺;最后,这种方式对于使用者而言是完全黑盒的,使用者不需要了解内部如何根据不同的任务负载来进行内存划分。Spark提供了两个相关的配置,但是大多数情况下直接使用默认值就能满足大部分负载情况:

● Spark Memory.Fraction表示M的大小占整个JVM(Java Virtue Machine)堆空间的比例(默认是0.6),剩余的空间(40%)被用来保存用户的数据结构及Spark内部的元数据(metadata),另一方面预防某些异常数据记录造成的OOM(Out of Memory)错误。

● Spark.Memory.StorageFraction表示R的大小占整个M的比例(默认是0.5), R是存储类内存在M中占用的空间,其中缓存的数据块不会被执行类内存剥夺。

5.2 优化策略

当我们需要初步判断内存的占用情况时,可以创建一个RDD,然后将其缓存(cache)起来,然后观察网页监控页面的存储页部分,就可以看出RDD占用了多少内存。而对于特殊的对象,我们可以调用SizeEstimator的estimate()方法来评估内存消耗,这对于实验不同数据层的内存消耗,以及判断广播变量在每个Executor堆上所占用的内存是非常有效的。当我们了解了内存的消耗情况后,发现占用内存过大,可以着手做一些优化,一方面可以在数据结构方面进行优化。首先需要注意的是,我们要避免本章开头提到的Java本身数据结构的头部开销,比如基于指针的数据结构或者包装器类型,有以下方式可以进行优化:

● 在设计数据结构时,优先使用基本数据类型及对象数组等,避免使用Java或者Scala标准库当中的集合类(如HashMap),在fastutil库中,为基本数据类型提供了方便的集合类接口,这些接口也兼容Java标准库。

● 尽可能避免在数据结构中嵌套大量的小对象和指针。

● 考虑使用数值类ID或者枚举对象来代替字符串类型作为主键(Key)。

● 如果我们的运行时内存小于32GB,可以加上JVM配置-XX:+UseCompressedOops将指针的占用空间由8个字节压缩到4个字节,我们也可以在Spark-env.sh中进行配置。

假设我们通过以上策略还是发现对象占用了过大的内存,可以用一个非常简单的方式来降低内存使用,就是将对象以序列化的形式(serialized form)存储,在RDD的持久化接口中使用序列化的存储级别,如MEMORY_ONLY_SER, Spark便会将每个RDD分区存储为一个很大的字节数组。而这种方式会使得访问数据的速度有所下降,因为每个对象访问时都需要有一个反序列化的过程。在7.1节中我们已经介绍过,优先使用Kryo序列化方式,其占用大小远低于Java本身的序列化方式。

5.3 垃圾回收(GC)优化

如果我们在应用中进行了频繁的RDD变动,那么JVM的垃圾回收会成为一个问题(也就是说,假设在程序中只创建了一个RDD,后续所有操作都围绕这个RDD,那么垃圾回收就不存在问题)。当Java需要通过删除旧对象来为新对象开辟空间时,它便会扫描我们曾创建的所有对象并找到不再使用的对象。所以垃圾回收的开销是和Java对象的个数成比例的,我们要尽可能地使用包含较少对象的数据结构(如使用Int数组代替LinkedList)来降低这部分开销。另外前面提到的用序列化形式存储也是一个很好的方法,序列化后每个对象在每个RDD分区下仅有一个对象(一个字节数组)。注意当GC开销成为瓶颈时,首先要尝试的便是序列化缓存(serialized caching)。在做GC优化时,我们首先需要了解GC发生的频率以及其所消耗的时间。这可以通过在Java选项中加入-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps来实现;之后当Spark任务运行后,便可以在Worker日志中看到GC发生时打印的信息。注意这些日志是打印在集群中的Worker节点上的(在工作目录的stdout文件中),而非Driver程序。为了进一步优化GC,首先简单介绍下Java虚拟机内部是如何进行内存管理的。

(1)Java对象是存储在堆空间内的,堆空间被分为两部分,即年轻区域(Young region)和老年区域(Old region),其中年轻代(Young generation)会用来存储短生命周期的对象,而老年代(Old generation)会用来存储较长生命周期的对象。

(2)年轻代的区域又被分为3个部分[Eden, Survivor1,Survivor2]。

(3)一个简单的GC流程大致是:当Eden区域满了,一次小型GC过程会将Eden和Survivor1中还存活的对象复制到Survivor2区域上,Survivor区域是可交换的(即来回复制),当一个对象存活周期已足够长或者Survivor2区域已经满时,那么它们会被移动到老年代上,而当老年代的区域也满了时,就会触发一次完整的GC过程。Java的这种GC机制主要是基于程序中创建的大多数对象,都会在创建后被很快销毁,只有极少数对象会存活下来,所以其分为年轻代和老年代两部分,而这两部分GC的方式也是不同的,其时间复杂度也是不同的,年轻代会更加快一些,感兴趣的读者可以进一步查阅相关资料。基于以上原因,Spark在GC方面优化的主要目标是:只有长生命周期的RDD会被存储在老年代上,而年轻代上有足够的空间来存储短生命周期的对象,从而尽可能避免任务执行时创建的临时对象触发完整GC流程。我们可以通过以下步骤来一步步优化:

(1)通过GC统计信息观察是否存在过于频繁的GC操作,如果在任务完成前,完整的GC操作被调用了多次,那么说明可执行任务并没有获得足够的内存空间。

(2)如果触发了过多的小型GC,而完整的GC操作并没有调用很多次,那么给Eden区域多分配一些内存空间会有所帮助。我们可以根据每个任务所需内存大小来预估Eden的大小,如果Eden设置大小为E,可以利用配置项-Xmn=4/3*E来对年轻代的区域大小进行设置(其中4/3的比例是考虑到survivor区域所需空间)。(3)如果我们观察GC打印的统计信息,发现老年代接近存满,那么就需要改变spark.memory.fraction来减少存储类内存(用于caching)的占用,因为与其降低任务的执行速度,不如减少对象的缓存大小。另一个可选方案是减少年轻代的大小,即通过-Xmn来进行配置,也可以通过JVM的NewRatio参数进行调整,大多数JVM的该参数的默认值是2,意思是老年代占整个堆内存的2/3,这个比例需要大于Spark.Memory.Fraction。

(4)通过加入-XX:+UserG1GC来使用G1GC垃圾回收器,这可以一定程度提高GC的性能。另外注意对于executor堆内存非常大的情况,一定通过-XX:G1HeapRegionSize来增加G1区域的大小。

针对以上步骤我们举一个例子,如果我们的任务是从HDFS当中读取数据,任务需要的内存空间可以通过从HDFS当中读取的数据块大小来进行预估,一般解压后的数据块大小会是原数据块的2~3倍,所以如果我们希望3、4个任务同时运行在工作空间中,假设每个HDFS块大小是128MB,那么需要将Eden大小设置为4×3×128MB。改动之后,我们可以监控GC的频率和时间消耗,看看有没有达到优化的效果。对于优化GC,主要还是从降低全局GC的频率出发,executor中对于GC优化的配置可以通过spark.executor.extraJavaOptions来配置。

5.4 Spark Streaming内存优化

前面介绍了Spark中的优化策略和关于GC方面的调优,对于Spark Streaming的应用程序,这些策略也都是适用的,除此之外还会有一些其他方面的优化点。对于Spark Streaming应用所需要的集群内存,很大程度上取决于要使用哪种类型的transformation操作。比如,假设我们想使用10分钟数据的窗口操作,那么我们的集群必须有足够的空间能够保存10分钟的全部数据;亦或,我们在大量的键值上使用了updateStateByKey操作,那么所需要的内存空间会较大。而如果我们仅仅使用简单的Map、Filter、Store操作,那么所需空间会较小。默认情况下,接收器接收来的数据会以StorageLevel.MEMORY_AND_DISK_SER_2的格式存储,那么如果内存不足时,数据就会序列化到硬盘上,这样会损失SparkStreaming应用的性能。所以通常建议为Spark Streaming应用分配充足的内存,可以在小规模数据集上进行测试和判断。另一方面与Spark程序有显著区别的是,Spark Streaming程序对实时性要求会较高,所以我们需要尽可能降低JVM垃圾回收所导致的延迟。基于此,我们可以通过以下几个参数对内存使用和GC开销进行优化调整。

● DStream的持久化级别:输入数据默认是持久化为字节流的,因为相较于反序列化的开销,其更会降低内存的使用并且减少GC的开销。所以优先使用Kryo序列化方式,可以大大降低序列化后的尺寸和内存开销。另外,如果需要更进一步减少内存开销,可以通过配置spark.rdd.compress进行更进一步的压缩(当然对于目前的集群机器,大多数内存都足够了)。

● 及时清理老数据:默认情况下所有的输入数据和由DStream的Transormation操作产生的持久RDD会被自动清理,即Spark Streaming会决定何时对数据进行清理。例如,假设我们使用10分钟的窗口操作,Spark Streaming会保存之前10分钟的所有数据,并及时清理过时的老数据。数据保存的时间可以通过stremingContext.remember进行设置。

● CMS垃圾回收器:不同于之前我们在Spark中的建议,由于需要减少GC间的停顿,所以这里建议使用并发标记清除类的GC方式。即使并发GC会降低全局系统的生产吞吐量,但是使用这种GC可以使得每个Batch的处理时间更加一致(不会因为某个Batch处理时发生了GC,而导致处理时间剧增)。我们需要确保在Driver节点(在spark-submit中使用—driver-java-options)和Executor节点(在Spark配置中使用spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC)都设置了CMS GC方式。

● 其他减少GC开销的方式有:可以通过OFF_HEAP存储级别的RDD持久化方式,以及可以在Executor上使用更小的堆内存,从而降低每个JVM堆垃圾回收的压力。

作者 east
Kafka, Spark 3月 3,2021

SparkStreaming Direct方式读取kafka优缺点及示例(Redis保存offset)

在Spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有Receiver这一层,其会周期性地获取Kafka中每个topic(主题)的每个partition(分区)中的最新offsets(偏移量),之后根据设定的maxRatePerPartition来处理每个batch。其形式如下图所示。

这种方法相较于Receiver方式的优势在于:

● 简化的并行。Direct方式中,Kafka中的partition与Spark内部的partition是一一对应的,这点使得我们可以很容易地通过增加Kafka中的partition来提高数据整体传输的并行度,而不像Receiver方式中还需要创建多个Receiver然后利用union再合并成统一的Dstream。

● 高效。Direct方式中,我们可以自由地根据offset来从Kafka中拉取想要的数据(前提是Kafka保留了足够长时间的数据),这对错误恢复提供了很好的灵活性。然而在Receiver的方式中,还需要将数据存入Write Ahead Log中,存在数据冗余的问题。

● 一次性接收精确的数据记录Direct方式中我们直接使用了低阶Kafka的API接口,offset默认会利用Spark Steaming的checkpoints来存储,同样也可以将其存到数据库等其他地方。然而在Receiver的方式中,由于使用了Kafka的高阶API接口,其默认是从ZooKeeper中拉取offset记录(通常Kafka取数据都是这样的),但是Spark Streaming消费数据的情况和ZooKeeper记录的情况是不同步的,当程序发生中断或者错误时,可能会造成数据重复消费的情况。

不同于Receiver的方式,是从Zookeeper中读取offset值,那么自然Zookeeper就保存了当前消费的offset值,如果重新启动开始消费就会接着上一次offset值继续消费。而在Direct的方式中,是直接从Kafka来读数据,offset需要自己记录,可以利用checkpoint、数据库或文件记录或者回写到ZooKeeper中进行记录。这里我们给出利用Kafka底层API接口,将offset及时同步到ZooKeeper的通用类中。下面示范用redis保存offset

object Demo {


  val IP_RANG: Array[String] = "91,92,93,94,95".split(",")
  val PORT_RANG: Array[String] = "22420,22421,22422,22423,22424,22425,22426,22427".split(",")
  val hosts = new util.HashSet[HostAndPort]()

  val sdf:SimpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")

  def main(args: Array[String]) {

      val Array(checkPointDir, topic, brokers, groupId, cf, offset, dw_all_tn, dw_track_tn, dw_unique_tn, batchIntervel) = args

      login

      val client: JedisCluster = new JedisCluster(hosts, 5000)


      var topicPartitions: Map[TopicPartition, Long] = Map()

      if (client.exists(topic)) {
        val offsetMap: util.Map[String, String] = client.hgetAll(topic)
        val iterator: util.Iterator[String] = offsetMap.keySet().iterator()
        while (iterator.hasNext) {
          val key: String = iterator.next()
          val value: String = offsetMap.get(key)
          println(key + "------" + value)
          topicPartitions += (new TopicPartition(topic, key.toInt) -> value.toLong)
        }
      }
      client.close()

      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> brokers,
        "value.deserializer" -> classOf[StringDeserializer],
        "key.deserializer" -> classOf[StringDeserializer],
        "group.id" -> groupId,
        "security.protocol" -> "SASL_PLAINTEXT",
        "sasl.kerberos.service.name" -> "kafka",
        "auto.offset.reset" -> offset,
        "kerberos.domain.name" -> "hadoop.hadoop.com",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )


      def functionToCreateContext(): StreamingContext = {

        //      val topicArr = topic.split(",")
        //      val topicSet = topicArr.toSet


        val locationStrategy = LocationStrategies.PreferConsistent
        //      val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)

        val sparkConf: SparkConf = new SparkConf().setAppName("jingyi_xn_dw_all&track")

        val ssc = new StreamingContext(sparkConf, Seconds(batchIntervel.toInt))
        //      if (!"nocp".equals(checkPointDir)) {
        //        ssc.checkpoint(checkPointDir)
        //      }


        val config = HBaseConfiguration.create()
        val hbaseContext = new HBaseContext(ssc.sparkContext, config)

        val stream = KafkaUtils.createDirectStream[String, String](ssc,
          locationStrategy,
          //        consumerStrategy
          ConsumerStrategies.Assign[String, String](topicPartitions.keys.toList, kafkaParams, topicPartitions)
        )
    }
}

def setRedisHost: Unit ={
    for (host <- IP_RANG) {
      for (port <- PORT_RANG) {
        hosts.add(new HostAndPort("192.68.196." + host, port.toInt))
      }
    }
  }

       
作者 east

上一 1 … 60 61 62 … 93 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?

文章归档

  • 2025年8月
  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (493)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (79)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (35)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (7)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.