gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档大数据开发

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 分类归档: "大数据开发"
  • ( 页面30 )
Hive 3月 1,2021

Hive数据查询

数据查询

功能介绍

本小节介绍了如何使用HQL对数据进行查询分析。从本节中可以掌握如下查询分析方法:

  • SELECT查询的常用特性,如JOIN等。
  • 加载数据进指定分区。
  • 如何使用Hive自带函数。
  • 如何使用自定义函数进行查询分析,如何创建、定义自定义函数请见用户自定义函数。

样例代码

-- 查看薪水支付币种为美元的雇员联系方式. 
SELECT  
a.name,  
b.tel_phone,  
b.email  
FROM employees_info a JOIN employees_contact b  ON(a.id = b.id) WHERE usd_flag='D'; 
 
-- 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中. 
INSERT OVERWRITE TABLE employees_info_extended PARTITION (entrytime = '2014')  
SELECT  
a.id,  
a.name,  
a.usd_flag,  
a.salary,  
a.deductions,  
a.address, 
b.tel_phone, 
b.email  
FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE a.entrytime = '2014'; 
 
-- 使用Hive中已有的函数COUNT(),统计表employees_info中有多少条记录. 
SELECT COUNT(*) FROM employees_info; 
 
-- 查询使用以“cn”结尾的邮箱的员工信息. 
SELECT a.name, b.tel_phone FROM  employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE b.email like '%cn'; 

扩展使用

  • 配置Hive中间过程的数据加密 指定表的格式为RCFile(推荐使用)或SequenceFile,加密算法为ARC4Codec。SequenceFile是Hadoop特有的文件格式,RCFile是Hive优化的文件格式。RCFile优化了列存储,在对大表进行查询时,综合性能表现比SequenceFile更优。 set hive.exec.compress.output=true; set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.encryption.arc4.ARC4Codec;
作者 east
Hive 3月 1,2021

Hive创建表示例

创建表

功能介绍

本小节介绍了如何使用HQL创建内部表、外部表的基本操作。创建表主要有以下三种方式:

  • 自定义表结构,以关键字EXTERNAL区分创建内部表和外部表。
    • 内部表,如果对数据的处理都由Hive完成,则应该使用内部表。在删除内部表时,元数据和数据一起被删除。
    • 外部表,如果数据要被多种工具(如Pig等)共同处理,则应该使用外部表,可避免对该数据的误操作。删除外部表时,只删除掉元数据。
  • 根据已有表创建新表,使用CREATE LIKE句式,完全复制原有的表结构,包括表的存储格式。
  • 根据查询结果创建新表,使用CREATE AS SELECT句式。 这种方式比较灵活,可以在复制原表表结构的同时指定要复制哪些字段,不包括表的存储格式。

样例代码

-- 创建外部表employees_info. 
CREATE EXTERNAL TABLE IF NOT EXISTS employees_info 
( 
id INT, 
name STRING, 
usd_flag STRING, 
salary DOUBLE, 
deductions MAP<STRING, DOUBLE>, 
address STRING, 
entrytime STRING 
) 
-- 指定行中各字段分隔符. 
-- "delimited fields terminated by"指定列与列之间的分隔符为',',"MAP KEYS TERMINATED BY"指定MAP中键值的分隔符为'&'. 
ROW FORMAT delimited fields terminated by ',' MAP KEYS TERMINATED BY '&'  
-- 指定表的存储格式为TEXTFILE. 
STORED AS TEXTFILE;  
 
-- 使用CREATE Like创建表. 
CREATE TABLE employees_like LIKE employees_info; 
 
-- 使用DESCRIBE查看employees_info、employees_like、 employees_as_select表结构. 
DESCRIBE employees_info; 
DESCRIBE employees_like; 

扩展应用

  • 创建分区表 一个表可以拥有一个或者多个分区,每个分区以文件夹的形式单独存在表文件夹的目录下。对分区内数据进行查询,可缩小查询范围,加快数据的检索速度和可对数据按照一定的条件进行管理。 分区是在创建表的时候用PARTITIONED BY子句定义的。 CREATE EXTERNAL TABLE IF NOT EXISTS employees_info_extended ( id INT, name STRING, usd_flag STRING, salary DOUBLE, deductions MAP<STRING, DOUBLE>, address STRING ) — 使用关键字PARTITIONED BY指定分区列名及数据类型 . PARTITIONED BY (entrytime STRING) STORED AS TEXTFILE;
  • 更新表的结构 一个表在创建完成后,还可以使用ALTER TABLE执行增、删字段,修改表属性,添加分区等操作 — 为表employees_info_extended增加tel_phone、email字段. ALTER TABLE employees_info_extended ADD COLUMNS (tel_phone STRING, email STRING);
  • 建表时配置Hive数据加密 指定表的格式为RCFile(推荐使用)或SequenceFile,加密算法为ARC4Codec。SequenceFile是Hadoop特有的文件格式,RCFile是Hive优化的文件格式。RCFile优化了列存储,在对大表进行查询时,综合性能表现比SequenceFile更优。 set hive.exec.compress.output=true; set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.encryption.arc4.ARC4Codec; create table seq_Codec (key string, value string) stored as RCFile;
作者 east
Hbase 3月 1,2021

Hbase 基于二级索引的查询

基于二级索引的查询

功能介绍

针对添加了二级索引的用户表,您可以通过Filter来查询数据。其数据查询性能高于针对无二级索引用户表的数据查询。

二级索引的使用规则如下:

  • 针对某一列或者多列创建了单索引的场景下:
    • 当查询时使用此列进行过滤时,不管是AND还是OR操作,该索引都会被利用来提升查询性能。 例如:Filter_Condition(IndexCol1) AND/OR Filter_Condition(IndexCol2)
    • 当查询时使用“索引列AND非索引列”过滤时,此索引会被利用来提升查询性能。 例如:Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(NonIndexCol1)
    • 当查询时使用“索引列OR非索引列”过滤时,此索引将不会被使用,查询性能不会因为索引得到提升。 例如:Filter_Condition(IndexCol1) AND/OR Filter_Condition(IndexCol2) OR Filter_Condition(NonIndexCol1)
  • 针对多个列创建的联合索引场景下:
    • 当查询时使用的列(多个),是联合索引所有对应列的一部分或者全部,且列的顺序与联合索引一致时,此索引会被利用来提升查询性能。 例如,针对C1、C2、C3列创建了联合索引,生效的场景包括: Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) Filter_Condition(IndexCol1) 不生效的场景包括: Filter_Condition(IndexCol2) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol2) Filter_Condition(IndexCol3)
    • 当查询时使用“索引列AND非索引列”过滤时,此索引会被利用来提升查询性能。 例如: Filter_Condition(IndexCol1) AND Filter_Condition(NonIndexCol1) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(NonIndexCol1)
    • 当查询时使用“索引列OR非索引列”过滤时,此索引不会被使用,查询性能不会因为索引得到提升。 例如: Filter_Condition(IndexCol1) OR Filter_Condition(NonIndexCol1) (Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2))OR ( Filter_Condition(NonIndexCol1))
    • 当查询时使用多个列进行范围查询时,只有联合索引中最后一个列可指定取值范围,前面的列只能设置为“=”。 例如:针对C1、C2、C3列创建了联合索引,需要进行范围查询时,只能针对C3设置取值范围,过滤条件为“C1=XXX,C2=XXX,C3=取值范围”。
  • 针对添加了二级索引的用户表,可以通过Filter来查询数据,在单列索引和复合列索引上进行过滤查询,查询结果都与无索引结果相同,且其数据查询性能高于无二级索引用户表的数据查询性能。

代码样例

下面代码片段在com.huawei.hadoop.hbase.example包的“HBaseSample”类的testScanDataByIndex方法中:

样例:使用二级索引查找数据

  public void testScanDataByIndex() {
    LOG.info("Entering testScanDataByIndex.");
    Table table = null;
    ResultScanner scanner = null;
    try {
      table = conn.getTable(tableName);
      
      // Create a filter for indexed column.
      Filter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("name"),
          CompareOp.EQUAL, "Li Gang".getBytes());
      Scan scan = new Scan();
      scan.setFilter(filter);
      scanner = table.getScanner(scan);
      LOG.info("Scan indexed data.");
      
      for (Result result : scanner) {
        for (Cell cell : result.rawCells()) {
          LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":"
              + Bytes.toString(CellUtil.cloneFamily(cell)) + ","
              + Bytes.toString(CellUtil.cloneQualifier(cell)) + ","
              + Bytes.toString(CellUtil.cloneValue(cell)));
        }
      }
      LOG.info("Scan data by index successfully.");
    } catch (IOException e) {
      LOG.error("Scan data by index failed.");
    } finally {
      if (scanner != null) {
        // Close the scanner object.
        scanner.close();
      }
      try {
        if (table != null) {
          table.close();
        }
      } catch (IOException e) {
        LOG.error("Close table failed.");
      }
    }
    
    LOG.info("Exiting testScanDataByIndex.");
  }

注意事项

需要预先对字段name创建二级索引。

相关操作

基于二级索引表查询。

查询样例如下:

用户在hbase_sample_table的info列族的name列添加一个索引,在客户端执行,

hbase org.apache.hadoop.hbase.hindex.mapreduce.TableIndexer -Dtablename.to.index=hbase_sample_table -Dindexspecs.to.add='IDX1=>info:[name->String]'

然后用户需要查询“info:name”,在hbase shell执行如下命令:

>scan ‘hbase_sample_table’,{FILTER=>”SingleColumnValueFilter(family,qualifier,compareOp,comparator,filterIfMissing,latestVersionOnly)”}

说明:

hbase shell下面做复杂的查询请使用API进行处理。

参数说明:

  • family:需要查询的列所在的列族,例如info;
  • qualifier:需要查询的列,例如name;
  • compareOp:比较符,例如=、>等;
  • comparator:需要查找的目标值,例如binary:Zhang San;
  • filterIfMissing:如果某一行不存在该列,是否过滤,默认值为false;
  • latestVersionOnly:是否仅查询最新版本的值,默认值为false。

例如:

>scan 'hbase_sample_table',{FILTER=>"SingleColumnValueFilter('info','name',=,'binary:Zhang San',true,true)"}
作者 east
Hbase 3月 1,2021

Hbase创建二级索引

创建二级索引

功能简介

一般都通过调用org.apache.hadoop.hbase.hindex.client.HIndexAdmin中方法进行HBase二级索引的管理,该类中提供了创建索引的方法。

说明:

二级索引不支持修改,如果需要修改,请先删除旧的然后重新创建。

代码样例

以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的createIndex方法中。

public void createIndex() {     
LOG.info("Entering createIndex.");
String indexName = "index_name";
// Create index instance
TableIndices tableIndices = new TableIndices();
IndexSpecification iSpec = new IndexSpecification(indexName); iSpec.addIndexColumn(new HColumnDescriptor("info"), "name", ValueType.String);//注[1]
tableIndices.addIndex(iSpec);
HIndexAdmin iAdmin = null;
Admin admin = null;
try {
admin = conn.getAdmin();
iAdmin = new IndexAdmin(conf);
// add index to the table
iAdmin.addIndices(tableName, tableIndices);
LOG.info("Create index successfully.");
} catch (IOException e) {
LOG.error("Create index failed " ,e);
} finally {
if (admin != null) {
try {
admin.close();
} catch (IOException e) {
LOG.error("Close admin failed " ,e);
}
}
if (iAdmin != null) {
try {
// Close IndexAdmin Object
iAdmin.close();
} catch (IOException e) {
LOG.error("Close admin failed " ,e);
}
}
}
LOG.info("Exiting createIndex.");
}

新创建的二级索引默认是不启用的,如果需要启用指定的二级索引,可以参考如下代码片段。该代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的enableIndex方法中。

  public void enableIndex() {
    LOG.info("Entering createIndex.");

    // Name of the index to be enabled
    String indexName = "index_name";

    List<String> indexNameList = new ArrayList<String>();
    indexNameList.add(indexName);
    HIndexAdmin iAdmin = null;
    try {
      iAdmin = HIndexClient.newHIndexAdmin(conn.getAdmin());
      // Alternately, enable the specified indices
      iAdmin.enableIndices(tableName, indexNameList);
      System.out.println("Successfully enable indices " + indexNameList + " of the table " + tableName);
    } catch (IOException e) {
      System.out.println("Failed to enable indices " + indexNameList + " of the table " + tableName + "." + e);
    } finally {
      if (iAdmin != null) {
        try {
          iAdmin.close();
        } catch (IOException e) {
          LOG.error("Close admin failed ", e);
        }
      }
    }
  }

注意事项

注[1]:创建联合索引

HBase支持在多个字段上创建二级索引,例如在列name和age上。

HIndexSpecification iSpecUnite = new HIndexSpecification(indexName); 
 iSpecUnite.addIndexColumn(new HColumnDescriptor("info"), "name", ValueType.String); 
 iSpecUnite.addIndexColumn(new HColumnDescriptor("info"), "age", ValueType.String);

相关操作

使用命令创建索引表。

您还可以通过TableIndexer工具在已有用户表中创建索引。

说明:

<table_name>用户表必须存在。

hbase org.apache.hadoop.hbase.index.mapreduce.TableIndexer -Dindexspecs.to.add=<table_name> -Dtable.columns.index='IDX1=>cf1:[q1->datatype&length];cf2:[q1->datatype],[q2->datatype],[q3->datatype]#IDX2=>cf1:[q5->datatype&length]

“#”用于区分不同的索引,“;”用于区分不同的列族,“,”用于区分不同的列。

tablename.to.index:创建索引的用户表表名。

indexspecs.to.add:创建索引对应的用户表列。

其中命令中各参数的含义如下:

  • IDX1:索引名称
  • cf1:列族名称。
  • q1:列名。
  • datatype:数据类型。数据类型仅支持Integer、String、Double、Float、Long、Short、Byte、Char类型。
作者 east
Hbase 3月 1,2021

HBase支持全文索引

HBase支持全文索引

功能简介

通过org.apache.luna.client.LunaAdmin对象的createTable方法来创建表和索引,并指定表名、列族名、索引创建请求,mapping文件所在目录路径。也可通过addCollection往已有表中添加索引。查询时通过org.apache.luna.client.LunaAdmin对象的getTable方法来获取Table对象进行scan操作。

说明:

表的列名以及列族名不能包含特殊字符,可以由字母、数字以及下划线组成。

带有全文索引的HBase表限制:

1、不支持多实例;

2、不支持容灾备份恢复;

3、不支持删除行/列族操作;

4、Solr侧查询不支持强一致性;

代码样例片段

以下代码片段在com.huawei.bigdata.hbase.examples包的“LunaSample”类的testFullTextScan方法中。

  public static void testFullTextScan() throws Exception {
    /**
     * Create create request of Solr. Specify collection name, confset name,
     * number of shards, and number of replication factor.
     */
    Create create = new Create();
    create.setCollectionName(COLLECTION_NAME);
    create.setConfigName(CONFSET_NAME);
    create.setNumShards(NUM_OF_SHARDS);
    create.setReplicationFactor(NUM_OF_REPLICATIONFACTOR);
    /**
     * Create mapping. Specify index fields(mandatory) and non-index
     * fields(optional).
     */
    List<ColumnField> indexedFields = new ArrayList<ColumnField>();
    indexedFields.add(new ColumnField("name", "f:n"));
    indexedFields.add(new ColumnField("cat", "f:t"));
    indexedFields.add(new ColumnField("features", "f:d"));
    Mapping mapping = new Mapping(indexedFields);
    /**
     * Create table descriptor of HBase.
     */
    HTableDescriptor desc = new HTableDescriptor(HBASE_TABLE);
    desc.addFamily(new HColumnDescriptor(TABLE_FAMILY));
    /**
     * Create table and collection at the same time.
     */
    LunaAdmin admin = null;
    try {
      admin = new AdminSingleton().getAdmin();
      admin.deleteTable(HBASE_TABLE);
      if (!admin.tableExists(HBASE_TABLE)) {
        admin.createTable(desc, Bytes.toByteArrays(new String[] { "0", "1", "2", "3", "4" }),
            create, mapping);
      }
      /**
       * Put data.
       */
      Table table = admin.getTable(HBASE_TABLE);
      int i = 0;
      while (i < 5) {
        byte[] row = Bytes.toBytes(i + "+sohrowkey");
        Put put = new Put(row);
        put.addColumn(TABLE_FAMILY, Bytes.toBytes("n"), Bytes.toBytes("ZhangSan" + i));
        put.addColumn(TABLE_FAMILY, Bytes.toBytes("t"), Bytes.toBytes("CO" + i));
        put.addColumn(TABLE_FAMILY, Bytes.toBytes("d"), Bytes.toBytes("Male, Leader of M.O" + i));
        table.put(put);
        i++;
      }

      /**
       * Scan table.
       */
      Scan scan = new Scan();
      SolrQuery query = new SolrQuery();
      query.setQuery("name:ZhangSan1 AND cat:CO1");
      Filter filter = new FullTextFilter(query, COLLECTION_NAME);
      scan.setFilter(filter);
      ResultScanner scanner = table.getScanner(scan);
      LOG.info("-----------------records----------------");
      for (Result r = scanner.next(); r != null; r = scanner.next()) {
        for (Cell cell : r.rawCells()) {
          LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":"
              + Bytes.toString(CellUtil.cloneFamily(cell)) + ","
              + Bytes.toString(CellUtil.cloneQualifier(cell)) + ","
              + Bytes.toString(CellUtil.cloneValue(cell)));
        }
      }
      LOG.info("-------------------end------------------");
      /**
       * Delete collection.
       */
      admin.deleteCollection(HBASE_TABLE, COLLECTION_NAME);

      /**
       * Delete table.
       */
      admin.deleteTable(HBASE_TABLE);
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      /**
       * When everything done, close LunaAdmin.
       */
      admin.close();
    }
  }

解释

(1)创建索引请求

(2)创建表描述符

(3)获取LunaAdmin对象,LunaAdmin提供了建表和索引、添加索引、检查表是否存在、检查索引是否存在、删除索引和删除表等功能。

(4)调用LunaAdmin的建表方法。

(5)往表中插入数据。

(6)构造全文索引条件,设置FullTextFilter,进行查询。

(7)删除索引。

(8)删除表。

(9)关闭admin资源。

注意事项

  • 创建表和索引都必须不存在。
  • 必须使用LunaAdmin获取Table对象进行scan操作。
作者 east
大数据开发 2月 21,2021

国外大公司Pig常见面试题

1)区分Hadoop MapReduce和Pig

Hadoop MapReduce是 编译语言 , 抽象级别低 , 代码需要更多行代码 ,
代码效率代码效率很高。

Pig是脚本语言,抽象级别高。pig与Hadoop MapReduce相比,代码行更少。
代码效率相对较低。

2)比较Apache Pig和SQL。

Apache Pig与SQL的区别在于ETL的用法,惰性评估,在管道中任何给定时间点存储数据,支持管道拆分和显式声明执行计划。 SQL围绕查询产生单个结果。 SQL没有用于拆分数据处理流并将不同的运算符应用于每个子流的内置机制。
Apache Pig允许将用户代码包括在管道的任何位置,而如果要在SQL中使用的数据首先需要导入到数据库中,然后开始清理和转换过程。

3)说明在Apache Pig中进行编程时对MapReduce的需求。

Apache Pig程序使用称为Pig Latin的查询语言编写,与SQL查询语言相似。为了执行查询,需要执行引擎。 Pig引擎将查询转换为MapReduce作业,因此MapReduce充当执行引擎,并且是运行程序所必需的。

4)说明BloomMapFile。

BloomMapFile是一个类,它扩展了MapFile类。它以HBase表格式使用,以使用动态Bloom筛选器为密钥提供快速的成员资格测试。

5) bag in Pig 是什么意思?

元组的集合在Apache Pig中称为包

6)Pig脚本中的foreach操作的用途是什么?

Apache Pig中的FOREACH操作用于将转换应用于数据包中的每个元素,以便执行相应的操作以生成新的数据项。

语法-FOREACH data_bagname GENERATE exp1,exp2

7)解释Pig中不同的复杂数据类型。

Apache Pig支持3种复杂的数据类型-

映射-这些是使用#连接在一起的键值存储。
元组-类似于表格中的行,其中不同的项目之间用逗号分隔。元组可以具有多个属性。
袋-无序的元组集合。包允许多个重复的元组。
8)Flatten在Pig中做什么?

有时,在元组或包中有数据,如果我们想从该数据中删除嵌套级别,则可以使用Pig中的Flatten修饰符。展平未套袋和元组。对于元组,Flatten运算符将用元组的字段代替元组,而取消嵌套的包有点复杂,因为它需要创建新的元组。

通过研究有趣的Pig实时示例来掌握Hadoop

9)用户如何与Apache Pig中的shell交互?

使用Grunt即Apache Pig的交互式外壳,用户可以与HDFS或本地文件系统进行交互。要启动Grunt,用户应该不使用任何命令来调用Apache Pig –

执行“ pig –x local”命令将出现提示-


grunt >

通过在PIG_CLASSPATH中设置配置,可以在本地模式或集群模式下运行PigLatin脚本。

要退出grunt shell,请按CTRL + D或直接键入exit。

10)Apache Pig脚本使用哪些调试工具?

描述和解释是Apache Pig中重要的调试实用程序。

当尝试调试错误或优化PigLatin脚本时,explain实用程序对Hadoop开发人员很有帮助。 describe可以应用于脚本中的特定别名,也可以应用于grunt交互式shell中的整个脚本。说明实用程序会生成几个文本格式的图形,可以将其打印到文件中。
describe调试实用程序在编写Pig脚本时对开发人员很有帮助,因为它显示了脚本中的关系模式。对于尝试学习Apache Pig的初学者,可以使用describe实用程序来了解每个操作员如何更改数据。


11)在Apache Pig中用于说明什么?

在大型数据集上执行猪脚本通常需要很长时间。为解决此问题,开发人员在示例数据上运行了Pig脚本,但是选择的示例数据有可能无法正确执行您的Pig脚本。例如,如果脚本具有联接运算符,则示例数据中至少应有一些记录具有相同的键,否则联接操作将不返回任何结果。为了解决这类问题,使用了说明。说明从数据中获取样本,并且每当遇到诸如删除数据的联接或过滤器之类的运算符时,它都会通过对记录进行修改以使它们满足

他条件。说明仅显示每个阶段的输出,但不运行任何MapReduce任务。

12)解释Pig脚本的执行计划

或者

区分Apache Pig脚本的逻辑和物理计划

在执行pig脚本期间创建逻辑和物理计划。 Pig脚本基于解释器检查。逻辑计划是在语义检查和基本解析之后生成的,在逻辑计划的创建过程中不会进行任何数据处理。对于Pig脚本中的每一行,都会对运算符执行语法检查,并创建一个逻辑计划。每当脚本中遇到错误时,都会引发异常并结束程序执行,否则脚本中的每个语句都有自己的逻辑计划。

逻辑计划在脚本中包含运算符的集合,但不包含运算符之间的边缘。

生成逻辑计划后,脚本执行将移至物理计划,其中有关于Apache Pig将用来执行Pig脚本的物理运算符的描述。物理计划或多或少类似于一系列MapReduce作业,但是该计划没有任何关于如何在MapReduce中执行的参考。在创建物理计划时,将协同逻辑运算符转换为3个物理运算符,即–本地重排,全局重排和打包。加载和存储功能通常在物理计划中得到解决。

13)您对Apache Pig的区分大小写了解多少?

很难说Apache Pig是区分大小写还是不区分大小写。例如,pig中用户定义的函数,关系和字段名称区分大小写,即函数COUNT与函数计数不相同,或者X = load’foo’与x = load’foo’不相同。另一方面,Apache Pig中的关键字不区分大小写,即LOAD与load相同。

14)您能想到哪些Apache Pig用例?

Apache Pig大数据工具特别用于迭代处理,原始数据研究和传统ETL数据管道。由于Pig可以在模式未知,不一致或不完整的情况下运行,因此它被研究人员广泛使用,他们希望在清理数据并将其加载到数据仓库之前利用这些数据。

例如,要建立行为预测模型,网站可以使用它来跟踪访客对各种类型的广告,图像,文章等的响应。

15)区分PigLatin和HiveQL

必须在HiveQL中指定架构,而在PigLatin中是可选的。
HiveQL是一种声明性语言,而PigLatin是程序性语言。
HiveQL遵循平坦的关系数据模型,而PigLatin具有嵌套的关系数据模型。
阅读有关Pig vs.Hive的更多信息

16)PigLatin是一种强类型语言吗?如果是,那么您是如何得出结论的?

在强类型语言中,用户必须预先声明所有变量的类型。在Apache Pig中,当您描述数据的模式时,它期望数据以您提到的相同格式出现。但是,当模式未知时,脚本将在运行时适应实际的数据类型。因此,可以说PigLatin在大多数情况下是强类型的,但在极少数情况下是轻度键入的,即它继续处理不符合其期望的数据。

17)您对Pig的内包和外包有什么了解?

包内部的关系称为内包,而外包只是Pig中的关系

18)区分GROUP和COGROUP运算符。

GROUP和COGROUP运算符是相同的,并且可以使用一个或多个关系。 GROUP运算符通常用于按单个关系对数据进行分组以提高可读性,而COGROUP可以用于按2个或更多关系对数据进行分组。 COGROUP更像是GROUP和JOIN的组合,即它基于列对表进行分组,然后将它们联接到分组的列上。一次最多可以组合127个关系。

19)解释一下Apache Pig中COUNT_STAR和COUNT函数之间的区别吗?

在计算袋中元素数时,COUNT函数不包括NULL值,而COUNT_STAR(0函数在计数时包括NULL值。

20)Apache Pig提供了哪些各种诊断运算符?

转储运算符-用于在屏幕上显示Pig Latin语句的输出,以便开发人员可以调试代码。
描述操作员-在Apache Pig面试问题10中解释
解释操作员-在apache Pig面试中解释问题-10号
说明操作员-在apache pig面试问题-11中解释
21)您将如何合并两个或多个关系的内容,并将单个关系分为两个或多个关系?

这可以使用UNION和SPLIT运算符来完成。

22)我有一个关系R。如何从关系R中获得前10个元组?

20)Apache Pig提供了哪些各种诊断运算符?

转储运算符-用于在屏幕上显示Pig Latin语句的输出,以便开发人员可以调试代码。
描述操作员-在Apache Pig面试问题10中解释
解释操作员-在apache Pig面试中解释问题-10号
说明操作员-在apache pig面试问题-11中解释
21)您将如何合并两个或多个关系的内容,并将单个关系分为两个或多个关系?

这可以使用UNION和SPLIT运算符来完成。

22)我有一个关系R。如何从关系R中获得前10个元组?

TOP()函数从一包元组或一个关系中返回前N个元组。 N与要比较其值的列以及关系R一起作为参数传递给函数top()。

23)Pig和Hive之间有什么共同点?

HiveQL和PigLatin都将命令转换为MapReduce作业。
它们不能用于OLAP事务,因为很难执行低延迟查询。
24)Apache Pig支持哪些Java UDF类型?

代数,评估和过滤器功能是Pig中支持的各种UDF类型。

25)您在HDFS目录中有一个名为employee.txt的文件,其中包含100条记录。您只想查看employee.txt文件中的前10条记录。您将如何做?

第一步是将文件Employee.txt加载到关系名称为Employee的文件中。

员工数据的前10条记录可以使用limit运算符获取-

结果=限制员工10。

26)解释Apache Pig中的标量数据类型。

integer,float,double,long,bytearray和char数组是Apache Pig中可用的标量数据类型。

27)用户如何与Apache Pig中的HDFS交互?

使用grunt外壳。

28)在Apache Pig中使用过滤器有什么用?

就像SQL中的where子句一样,Apache Pig具有用于根据给定条件或谓词提取记录的过滤器。如果谓词或条件变为true,则记录将通过管道传递。谓词包含各种运算符,例如==,<=,!=,> =。

例子 –

X =将“输入”加载为(名称,地址)

Y =通过符号匹配“ Mr. *”的X;

29)什么是pig的UDF?

如果内置运算符不提供某些功能,则程序员可以通过使用其他编程语言(例如Java,Python,Ruby等)编写用户定义的函数来实现这些功能。然后可以将这些用户定义的函数(UDF)嵌入到Pig Latin中脚本。

30)您可以在Apache Pig脚本中加入多个字段吗?

是的,可以在PIG脚本中联接多个字段,因为联接操作从一个输入获取记录,然后将它们与另一输入联接。这可以通过为每个输入指定键来实现,当键相等时,两行将连接在一起。

31)Pig是否支持多行命令?

是的

作者 east
flume 2月 21,2021

Flume案例研究:接收Twitter数据


问题陈述
在此案例研究中,将flume代理配置为从Twitter检索数据。我们知道,Twitter是巨大的数据来源,具有人们的意见和偏好。数据可用于分析舆论或对特定主题或产品进行评论。基于推文数据和位置可以进行各种类型的分析。来自flume的数据可用于通过Streaming API使用Apache Spark进行实时处理。 Spark Streaming用于使用各种数据源(例如Kafka,Flume或TCP套接字)处理实时数据。它还支持Twitter流API。通过使用Flume,我们可以构建一个容错系统,该系统提供实时数据并将数据的副本保存在所需的位置。 Spark还内置了机器学习算法,可以使分析更快,更可靠且具有容错能力。

这样,我们可以使用Spark实时获取所需的结果,并将数据存储在数据库中,以便使用Hadoop进行更深入的分析。现在,我们构建一个简单的flume代理,该代理具有Twitter源和接收器,Spark可通过接收器进行数据检索。为了防止数据丢失,我们将使用自定义接收器构建flume代理。即使spark产生故障,由于数据传输中的事务处理功能,数据仍保留在通道中。

拟议的解决方案
现在,必须为我们架构中的各种重要组件设置配置。这样的组件之一就是被配置为从Twitter读取数据的源。源为“ source_read”的flume代理“ agent1”已配置为自定义类型源。为了访问数据,通过注册应用程序,twitter提供了凭证,用户可以使用凭证来检索数据。如果我们需要包含这些单词的特定推文,我们也可以设置关键字。在对特定主题或产品进行分析时,这非常有用。

Cloudera提供了必须包含在Flume类路径中的jar文件才能访问这些类。可以通过在“ flume-env.sh”配置文件中添加jar的路径来完成。如果需要设置其他参数(例如代理),则必须使用源代码重新构建jar。

agent1.sources.source_read.type =
    com.cloudera.flume.source.TwitterSource
agent1.sources.source_read.channels = MemChannel
agent1.sources.source_read.consumerKey = 
agent1.sources.source_read.consumerSecret = 
agent1.sources.source_read.accessToken = 
agent1.sources.source_read.accessTokenSecret = 
agent1.sources.source_read.keywords = hadoop

channel的配置如下:

agent1.channels.memory1.type = memory
agent1.channels.memory1.capacity = 1000
agent1.channels.memory1.transactionCapacity = 100

使用参数“ type”传递自定义sink的代码。 代码的jar文件必须添加到flume类路径中。 定义了spark的IP地址和端口。

agent1.sinks = spark_dump
agent1.sinks.spark_dump.type = org.apache.spark.streaming.flume.sink.SparkSink agent1.sinks.spark_dump.hostname =
agent1.sinks.spark_dump.port =
agent1.sinks.spark_dump.channel = memory1

启动flume:

$ bin/flume-ng agent -n $agent_name -c conf -f
    conf/flume-conf.properties.template
作者 east
Spark 2月 21,2021

Spark编程语言选择:scala对比python

为Apache Spark选择编程语言是一个主观问题,因为特定的数据科学家或数据分析师喜欢将Python或Scala用于Apache Spark的原因可能并不总是适用于其他人。根据独特的用例或要开发的特定类型的大数据应用程序-数据专家确定哪种语言更适合Apache Spark编程。对于数据科学家来说,学习Scala,Python,R和Java以便在Spark中进行编程并根据任务的功能解决方案的效率选择首选语言非常有用。让我们探索一些重要因素,然后再决定将Scala vs Python作为Apache Spark的主要编程语言。

Python vs Scala

Hadoop更快的表亲Apache Spark框架具有用于以各种语言(Java,Scala和Python)进行数据处理和分析的API。出于讨论的目的,我们将Java从大数据分析和处理的比较列表中删除,因为Java太冗长了。 Java不支持读取 – 评估 – 打印循环(REPL)选择编程语言,大数据处理时,这是一个重大的大忌。

Scala和Python都易于编程,可帮助数据专家快速提高生产率。数据科学家通常更喜欢同时学习Spark的Scala和Spark的Python,但是Python通常是Apache Spark第二受欢迎的语言,因为Scala最早出现在该语言中。但是,以下一些重要因素可以帮助数据科学家或数据工程师根据他们的要求选择最佳的编程语言:

Scala与Python进行Spark编程

1)Scala vs Python-性能
由于使用JVM,Scala编程语言的数据分析和处理速度比Python快10倍。当使用Python编程代码来调用Spark库时,性能是中等的,但是如果涉及的处理量比Python代码要慢得多,则它会比Scala等效代码慢得多。 Python解释器PyPy具有一个内置的JIT(即时)编译器,该编译器速度非常快,但不提供对各种Python C扩展的支持。在这种情况下,带有C扩展库的CPython解释器要优于PyPy解释器。

在Apache Spark上使用Python作为Scala的性能开销,但是重要性取决于您在做什么。当内核数量较少时,Scala比Python更快。随着内核数量的增加,Scala的性能优势开始减弱。

当使用大量内核时,性能不是选择Apache Spark编程语言的主要驱动因素。但是,当存在重要的处理逻辑时,性能是主要因素,而Scala肯定会比Python提供更好的性能,从而可以针对Spark进行编程。

2)Scala vs Python-学习曲线
使用Apache Spark进行编程时,Scala语言具有多种语法功能,因此在学习Scala for Spark时,大数据专业人员必须非常谨慎。程序员有时可能会疯狂地发现Scala用于在Spark中进行编程的语法。 Scala中的库很少,因此很难定义经验不足的程序员可以理解的随机符号运算符。使用Scala时,开发人员需要专注于代码的可读性。与Java或Python相比,Scala是一种语法灵活的复杂语言。对Scala开发人员的需求不断增长,因为大数据公司重视可以掌握Apache Spark中数据分析和处理的高效健壮编程语言的开发人员。

由于Java的语法和标准库,Python比较容易学习。但是,对于高并发和可扩展的系统(例如SoundCloud或Twitter),Python并不是理想的选择。

Learning Scala丰富了程序员对类型系统中各种新颖抽象,新颖的函数式编程功能和不可变数据的知识。

3)Scala vs Python –并发
大数据系统的复杂基础结构需要一种编程语言,该语言具有跨多个数据库和服务进行集成的能力。 Scala凭借Play框架赢得了这场比赛,该框架提供了许多异步库和反应式内核,可以轻松地与各种并发原语(例如Akka在大数据生态系统中的参与者)集成。 Scala允许开发人员编写高效,可读性和可维护性的服务,而无需将程序代码悬挂到不可读的回调蜘蛛网中。相反,Python确实使用uwsgi支持重量级的进程派生,但它不支持真正的多线程。

在将Python用于Spark时,无论进程具有多少线程,Python进程一次只能激活一个CPU。这有助于每个CPU内核处理一个进程,但是这样做的缺点是,每当要部署新代码时,都需要重新启动更多进程,这还需要额外的内存开销。在这些方面,Scala更加高效且易于使用。

4)Scala与Python – TypeSafety
使用Apache Spark进行编程时,开发人员需要根据不断变化的需求不断重构代码。 Scala是一种静态类型的语言,尽管由于经典的类型推断机制,它看起来像一种动态类型的语言。作为静态类型的语言,Scala仍然为编译器提供了捕获编译时错误的功能。

重构静态语言(例如Scala)的程序代码比重构动态语言(例如Python)要容易得多,而且没有麻烦。在修改Python程序代码后,开发人员通常会遇到困难,因为它比修复较旧的bug会产生更多的bug。 Python中的Typecheck实际上征服了Python的鸭子式哲学。使用Scala for Spark时要缓慢而安全,要比使用Python for Spark时要快而死。

对于较小的临时实验,Python是对抗Spark的有效选择,但对于生产中的大型软件工程,它无法像静态类型的语言Scala那样有效地扩展。

5)Scala vs Python –易于使用
Scala和Python语言在Spark上下文中具有同等的表现力,因此通过使用Scala或Python,可以实现所需的功能。无论哪种方式,程序员都会创建Spark内容并在其上调用函数。 Python是比Scala更用户友好的语言。 Python不太冗长,因此开发人员可以轻松地在Python中为Spark编写脚本。易于使用是一个主观因素,因为它取决于程序员的个人喜好。

6)Scala vs Python –高级功能
Scala编程语言具有多种存在性类型,宏和隐式。 Scala的神秘语法可能使尝试使用开发人员可能无法理解的高级功能变得困难。但是,Scala的优势在于在重要的框架和库中使用这些强大的功能。

话虽如此,Scala没有足够的数据科学工具和库(例如Python)用于机器学习和自然语言处理。 SparkMLib –机器学习库仅具有较少的ML算法,但它们是大数据处理的理想选择。 Scala缺乏良好的可视化和本地数据转换。 Scala绝对是Spark Streaming功能的最佳选择,因为Python Spark Streaming支持并不像Scala那样先进和成熟。

总结:针对Apache Spark的Scala与Python
“ Scala速度更快,并且易于使用,而Python速度较慢,但​​是非常易于使用。”

Apache Spark框架是用Scala编写的,因此了解Scala编程语言可以帮助大数据开发人员轻松地深入源代码(如果某些功能无法按预期运行)。使用Python会增加出现更多问题和bug的可能性,因为很难在2种不同语言之间进行翻译。使用Scala for Spark可以访问Spark框架的最新功能,因为它们首先在Scala中可用,然后移植到Python。

选择Scala vs Python for Spark取决于最适合项目需求的功能,因为每个功能各有优缺点。在选择用于使用Apache Spark进行编程的语言之前,开发人员必须学习Scala和Python以熟悉其功能。在学习了Python和Scala之后,就应该很容易决定何时将Scala用于Spark和何时将Python用于Spark。在Apache Spark中编程的语言选择纯粹取决于要解决的问题。

我们很想知道您对您选择哪种语言进行Apache Spark编程的意见。请在下面的评论中提及您的选择。

作者 east
Kafka, Spark 2月 21,2021

spark和kafka在数据流处理对比

2625 / 5000

在对Spark Streaming和Kafka Streaming进行比较并得出何时使用哪个比较之前,让我们首先对Data Streaming的基础知识有一个清晰的了解:它是如何出现的,流是什么,如何运行,其协议和用例。 。 数据流如何诞生? 从那时起,数据一直是操作的重要组成部分。数据构成了整个操作结构的基础,其中数据被进一步处理以在系统的不同实体模块中使用。这就是为什么它已成为IT领域的典型代表。 随着技术的发展,数据的重要性变得更加突出。数据处理中使用的方法已经发生了显着变化,以适应软件机构对数据输入的不断增长的需求。 随着时间的增长,数据处理的时间框架急剧缩短,以至于立即处理的输出有望满足最终用户的更高期望。随着人工智能的出现,人们强烈希望为看起来像人类的最终用户提供实时帮助。 此要求仅取决于数据处理强度。越快越好。因此,结果是处理数据的方式发生了变化。较早之前,在指定的延迟之后,有成批的输入被输入到系统中,从而将处理后的数据作为输出。 目前,这种延迟(延迟)是输入性能,处理时间和输出的结果,这已成为性能的主要标准之一。为了确保高性能,延迟必须最小化到几乎是实时的程度。 这就是数据流出现的方式。在数据流处理中,实时数据流作为输入传递,必须立即进行处理,并实时传递输出信息流。

什么是数据流?

数据流传输是一种方法,其中不按常规的批处理方式发送输入,而是以连续流的形式发布该流,并按原样使用算法进行处理。还以连续数据流的形式检索输出。 该数据流是使用数千个源生成的,这些源同时以小尺寸发送数据。这些文件背对背发送时形成连续的流程。这些可能是大量发送的日志文件以进行处理。 这种作为流出现的数据必须被顺序处理以满足(几乎)连续实时数据处理的要求。

为什么需要数据流?


随着企业在线人数的增加以及随之而来的对数据的依赖,人们已经意识到了数据的方式。数据科学和分析技术的出现导致大量数据的处理,为实时数据分析,复杂数据分析,实时流分析和事件处理提供了可能性。

当输入数据大小庞大时,需要进行数据流传输。我们需要先存储数据,然后再将其移动以进行批处理。由于数据以多批次的形式存储,因此涉及大量时间和基础架构。为了避免所有这些情况,信息以小数据包的形式连续流传输以进行处理。数据流提供超可伸缩性,这仍然是批处理的挑战。

使用数据流传输的另一个原因是要提供近乎实时的体验,其中最终用户在输入数据时会在几秒钟或几毫秒内获得输出流。

当数据源似乎无穷无尽且无法为批处理中断时,也需要进行数据流传输。 IoT传感器在此类别中发挥了重要作用,因为它们会生成连续的读数,需要对其进行处理以得出推论。

数据流如何发生?


为了通过实时处理数据做出即时决策,可以进行数据流传输。 根据系统的规模,复杂性,容错性和可靠性要求,您可以使用工具,也可以自己构建。

自行构建它意味着您需要在编码角色之前将事件放置在诸如Kafka之类的消息代理主题中。 这里的参与者是一段代码,旨在接收来自代理中的问题的事件(即数据流),然后将输出发布回代理。

Spark是第一代Streaming Engine,它要求用户编写代码并将其放置在actor中,他们可以进一步将这些actor连接在一起。 为了避免这种情况,人们经常使用Streaming SQL进行查询,因为它使用户可以轻松地查询数据而无需编写代码。 流SQL是对SQL的扩展支持,可以运行流数据。 此外,由于SQL在数据库专业人员中已得到很好的实践,因此执行流式SQL查询将更加容易,因为它基于SQL。

这是用例的流式SQL代码,在这种情况下,如果池中的温度在2分钟内下降了7度,则必须向用户发送警报邮件。

@App:name("Low Pool Temperature Alert")

@App: description('An application which detects an abnormal decrease in swimming pools temperature.')

@source(type='kafka',@map(type='json'),bootstrap.servers='localhost:9092',topic.list='inputStream',group.id='option_value',threading.option='single.thread')

define stream PoolTemperatureStream(pool string, temperature double);

@sink(type='email', @map(type='text'), ssl.enable='true',auth='true',content.type='text/html', username='sender.account', address='sender.account@gmail.com',password='account.password', subject="Low Pool Temperature Alert", to="receiver.account@gmail.com")

define stream EmailAlertStream(roomNo string, initialTemperature double, finalTemperature double);

--Capture a pattern where the temperature of a pool decreases by 7 degrees within 2 minutes

@info(name='query1')

from every( e1 = PoolTemperatureStream ) -> e2 = PoolTemperatureStream [e1.pool == pool and (e1.temperature + 7.0) >= temperature]

    within 2 min

select e1.pool, e1.temperature as initialTemperature, e2.temperature as finalTemperature

insert into EmailAlertStream;

Spark SQL提供DSL(特定于域的语言),这将有助于以不同的编程语言(例如Scala,Java,R和Python)操纵DataFrame。 它使您可以使用SQL或DataFrame API对Spark程序内部的结构化数据执行查询。 Kafka等新一代流引擎也支持Kafka SQL或KSQL形式的Streaming SQL。

尽管流处理的过程大致相同,但此处重要的是根据用例要求和可用的基础结构选择流引擎。 在得出结论之前,什么时候使用Spark Streaming和什么时候使用Kafka Streaming,让我们首先探索Spark Streaming和Kafka Streaming的基础知识,以更好地理解。

什么是Spark Streaming?

Spark Streaming是核心Spark API的扩展,可让其用户执行实时数据流的流处理。 它从Kafka,Flume,Kinesis或TCP套接字等来源获取数据。 可以使用复杂的算法对这些数据进行进一步处理,这些复杂的算法使用诸如map,reduce,join和window之类的高级功能表示。 最终输出(即处理后的数据)可以推送到诸如HDFS文件系统,数据库和实时仪表板之类的目标。

让我们仔细看看Spark Streaming的工作原理。 Spark Streaming从数据源以数据流的形式获取实时输入,并将其进一步分为几批,然后由Spark引擎处理以生成大量输出。 Spark Streaming允许您将机器学习和图形处理用于数据流以进行高级数据处理。它还提供了代表连续数据流的高级抽象。 数据流的这种抽象称为离散流或DStream。该DStream可以通过对Kafka,Flume和Kinesis等来源的数据流或其他DStream进行高级操作来创建。 这些DStream是RDD(弹性分布式数据集)的序列,RDD是分布在计算机集群上的多个只读数据集。这些RDD以容错方式进行维护,使其具有高度鲁棒性和可靠性。DStreams序列Spark Streaming使用Spark Core的快速数据调度功能来执行流分析。从诸如Kafka,Flume,Kinesis等之类的源中以迷你批的形式摄取的数据用于执行数据流处理所需的RDD转换。


Spark Streaming使您可以根据需要使用Scala,Java或Python编写程序来处理数据流(DStreams)。由于此处将用于批处理的代码用于流处理,因此使用Spark Streaming实现Lambda体系结构(将批处理和流处理混合在一起)变得容易得多。但这是以等于最小批处理持续时间的延迟为代价的。 Spark Streaming中的输入源 Spark支持主要来源,例如文件系统和套接字连接。另一方面,它也支持高级资源,例如Kafka,Flume,Kinesis。只有添加额外的实用程序类,才能获得这些出色的资源。 您可以使用以下工件链接Kafka,Flume和Kinesis。

kafka:spark-streaming-kafka-0-10_2.12

flume:spark-streaming-flume_2.12

Kinesis:spark-streaming-kinesis-asl_2.12

什么是Kafka流媒体?

Kafka Stream是一个客户端库,可让您处理和分析从Kafka接收的数据输入,并将输出发送到Kafka或其他指定的外部系统。 Kafka依赖于流处理概念,例如: 准确区分事件时间和处理时间 窗口支持 高效直接的应用程序状态管理 通过利用Kafka中的生产者和消费者库来利用Kafka的本机功能,从而简化了应用程序开发,从而使其更加直接和快捷。正是由于这种原生的Kafka潜力,使得Kafka流式传输可以提供数据并行性,分布式协调,容错性和操作简便性。 Kafka Streaming中的主要API是提供多个高级运算符的流处理DSL(特定于域的语言)。这些运算符包括:筛选器,映射,分组,窗口,聚合,联接和表的概念。 Kafka中的消息传递层对进一步存储和传输的数据进行分区。根据状态事件在Kafka流中对数据进行分区,以进行进一步处理。通过将拓扑划分为多个任务来缩放拓扑,其中为每个任务分配了输入流中的分区列表(Kafka主题),从而提供了并行性和容错能力。

Kafka可以进行状态转换,与Spark Streaming中的批处理不同。 它在其主题内存储状态,流处理应用程序将其用于存储和查询数据。 因此,其所有操作均受状态控制。 这些状态还用于连接主题以形成事件任务.Kafka中基于状态的操作 这是由于Kafka中基于状态的操作使其具有容错能力,并允许从本地状态存储中自动恢复。 Kafka Streaming中的数据流是使用表和KStreams的概念构建的,这有助于它们提供事件时间处理。

Spark Streaming与Kafka Streaming:

何时使用什么 Spark Streaming使您可以灵活地选择任何类型的系统,包括具有lambda架构的系统。但是,Spark Streaming的延迟范围从毫秒到几秒。 如果延迟不是一个重要的问题,并且您正在寻找在源兼容性方面的灵活性,那么Spark Streaming是最佳选择。可以在EC2,Hadoop YARN,Mesos或Kubernetes上使用独立的集群模式运行Spark Streaming。 它可以访问HDFS,Alluxio,Apache Cassandra,Apache HBase,Apache Hive和许多其他数据源中的数据。它提供了容错能力,还提供了Hadoop分发。 此外,在Spark流式传输的情况下,您不必为批处理和流式传输应用程序分别编写多个代码,在这种情况下,单个系统可以同时满足这两种情况。 另一方面,如果延迟是一个重要问题,并且必须坚持以短于毫秒的时间范围进行实时处理,则必须考虑使用Kafka Streaming。由于事件驱动处理,Kafka Streaming提供了高级的容错能力,但是与其他类型的系统的兼容性仍然是一个重要的问题。此外,在高可伸缩性要求的情况下,Kafka具有最佳的可伸缩性,因此非常适合。

如果您要处理从Kafka到Kafka的本机应用程序(输入和输出数据源都在Kafka中),则Kafka流式传输是您的理想选择。 虽然Kafka Streaming仅在Scala和Java中可用,但Spark Streaming代码可以用Scala,Python和Java编写。 结束语 随着技术的发展,数据也随着时间大量增长。处理此类海量数据的需求以及对实时数据处理的日益增长的需求导致了数据流的使用。通过几种数据流方法,尤其是Spark Streaming和Kafka Streaming,全面了解用例以做出最适合需求的最佳选择变得至关重要。 在用例中优先考虑需求对于选择最合适的流技术至关重要。鉴于事实,Spark Streaming和Kafka Streaming都是高度可靠的,并且广泛推荐作为Streaming方法,它在很大程度上取决于用例和应用程序,以确保最佳效果。 在本文中,我们指出了两种流传输方法的专业领域,以便为您提供更好的分类,这可以帮助您确定优先级并做出更好的决策。

作者 east
flume 2月 20,2021

Flume处理Spooling Directory Source数据太慢优化

一个数据采集处理系统,架构如下:

日志数据 -> flume -> kafka -> Spark Streaming。

flume到kafka的数据处理时间是1秒多;而spark Streaming的数据处理时间是十几毫秒。

flume方面:

flume数量不够:增加日志服务器以增加并行度;

(1)自身:增加内存flume-env.sh 4-6g

-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。

(2)找朋友:多个目录,多个spooling directory source同时采集

(3)taildir source的batchsize修改为合适的值

( 4 ) flume要读取文件夹如果文件太多,要按最新或最早顺序读取时,会很影响速度。

# batchsize是每次处理的数据条数越高,处理的数据越多,延迟越高。

kafka数据积压的问题,主要的解决办法是:

(1)增加Kafka对应的分区数(比如:期望处理数据的总吞吐量是100M/s。但是实际最多每个分区的生产能力和消费能力的最小值是20M/s,那么我们就需要设置5个或者6个分区),2)要求下一级消费者配套增加CPU核数,动态增加Kafka服务器集群。

(2)kafka ack设成0(ack有0有1有-1。0的可靠性最差,但是速度最快)

注:ack有3个可选值,分别是1,0,-1。

ack=1,简单来说就是,producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。这个副本必须是leader副本。只有leader副本成功写入了,producer才会认为消息发送成功。

ack=0,简单来说就是,producer发送一次就不再发送了,不管是否发送成功。

ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。

(3) a1.channels.c1.type = memory memory类型可能会丢失数据,但是速度最快。

作者 east
bug清单, flume 2月 20,2021

Flume Spooling Directory Source 采集NullPointerException

采用flume的Spooling Directory Source,突然遇到下面的错误

(org.apache.flume.source.SpoolDirectoryExtSource2$SpoolDirectoryRunnable.run:277)  - FATAL: Spool Directory source source1: { spoolDir: /home/work/local/log }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.lang.NullPointerException
    at org.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:159)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
    at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
    at org.apache.flume.serialization.DurablePositionTracker.initReader(DurablePositionTracker.java:171)
    at org.apache.flume.serialization.DurablePositionTracker.<init>(DurablePositionTracker.java:158)
    at org.apache.flume.serialization.DurablePositionTracker.getInstance(DurablePositionTracker.java:76)
    at org.apache.flume.client.avro.ReliableSpoolingFileEventExtReader2.openFile(ReliableSpoolingFileEventExtReader2.java:561)
    at org.apache.flume.client.avro.ReliableSpoolingFileEventExtReader2.getNextFile(ReliableSpoolingFileEventExtReader2.java:511)
    at org.apache.flume.client.avro.ReliableSpoolingFileEventExtReader2.readEvents(ReliableSpoolingFileEventExtReader2.java:264)
    at org.apache.flume.source.SpoolDirectoryExtSource2$SpoolDirectoryRunnable.run(SpoolDirectoryExtSource2.java:252)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

重启flume 后发现问题依旧。试验把flume 采集的日志放到新目录,重启flume采集新目录发现没问题了。看来是旧目录/home/work/local/log 有文件变动造成的。

例如 有.flumespool隐藏目录,该目录下有文件.flumespool-main.meta隐藏文件,用来记录flume读取文件的位置,发现该记录停止在flume出问题的时间。再查看其它正常运行的机器的相同路径及文件,并没有发现该文件,于是将该文件移到其它目录下,重启flume,此时发现flume成功运行!

作者 east
运维 2月 19,2021

linode如何迁移服务器

linode是个性价比的服务器,广受广大站长喜欢。由于种种原因,可以很方便进行不同地区迁移。例如linode日本地区服务器,在中国用移动网络不太稳定,要迁移到加州。

方法/步骤

  1. 虽然迁移服务器理论上讲只是ip变了,不影响服务上网站运行。但为防止万一,最后进行备份。登录linode服务器,找到要迁移的服务器,选择”Migrate”
  2. 在下一步的界面中,勾选同意,选择要迁移的服务器地区。最后点击“Enter Migrate Queue”
  3. 完成上一步后,可以在服务器的status上看到迁移的进度。迁移完成后会自动重启。
  4. 迁移服务后,由于ip变了,所以要在域名解析中,把新ip替换旧的ip。
作者 east

上一 1 … 29 30 31 … 41 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?

文章归档

  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (491)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (7)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.