Apache Flink处理IoT复杂数据流程案例

使用Apache Flink处理IoT复杂数据是一项涉及多个步骤和组件的任务,包括数据接入、数据清洗、实时处理、状态管理、窗口计算、以及结果输出等。以下是一个全面且详细的Flink流处理框架,结合理论和实际应用,以处理IoT数据为主线。

1. 引入依赖和设置环境

首先,需要在你的项目中引入Flink所需的依赖。

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
</dependencies>
xmlCopy Code<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
</dependencies>

2. 创建Flink执行环境

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度
        env.setParallelism(4);

        // 其他环境配置...
    }
}
javaCopy Codeimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度
        env.setParallelism(4);

        // 其他环境配置...
    }
}

3. 数据接入

通常,IoT数据会通过Kafka或其他消息队列接入。假设使用Kafka作为数据源:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-group");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("iot-topic", new SimpleStringSchema(), properties);
        DataStream<String> input = env.addSource(kafkaConsumer);

        // 进一步处理...
    }
}




4. 数据清洗和解析

实际的IoT数据通常是JSON格式的字符串,需要进行解析和清洗:

import org.apache.flink.api.common.functions.MapFunction;
import com.fasterxml.jackson.databind.ObjectMapper;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer setup...

        DataStream<String> input = env.addSource(kafkaConsumer);

        DataStream<IoTEvent> parsedStream = input.map(new MapFunction<String, IoTEvent>() {
            private ObjectMapper mapper = new ObjectMapper();

            @Override
            public IoTEvent map(String value) throws Exception {
                return mapper.readValue(value, IoTEvent.class);
            }
        });

        // 进一步处理...
    }

    public static class IoTEvent {
        public String deviceId;
        public long timestamp;
        public double temperature;
        public double humidity;
        // 其他字段和构造方法...
    }
}
javaCopy Codeimport org.apache.flink.api.common.functions.MapFunction;
import com.fasterxml.jackson.databind.ObjectMapper;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer setup...

        DataStream<String> input = env.addSource(kafkaConsumer);

        DataStream<IoTEvent> parsedStream = input.map(new MapFunction<String, IoTEvent>() {
            private ObjectMapper mapper = new ObjectMapper();

            @Override
            public IoTEvent map(String value) throws Exception {
                return mapper.readValue(value, IoTEvent.class);
            }
        });

        // 进一步处理...
    }

    public static class IoTEvent {
        public String deviceId;
        public long timestamp;
        public double temperature;
        public double humidity;
        // 其他字段和构造方法...
    }
}

5. 定义时间窗口和处理函数

import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.util.Collector;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer and parsing setup...

        DataStream<IoTEvent> parsedStream = input.map(new MapFunction<String, IoTEvent>() {
            private ObjectMapper mapper = new ObjectMapper();

            @Override
            public IoTEvent map(String value) throws Exception {
                return mapper.readValue(value, IoTEvent.class);
            }
        });

        DataStream<AggregatedResult> resultStream = parsedStream
            .keyBy(event -> event.deviceId)
            .timeWindow(Time.minutes(1))
            .process(new AggregateTemperatureHumidity());

        // 进一步处理...
    }

    public static class AggregateTemperatureHumidity extends ProcessWindowFunction<IoTEvent, AggregatedResult, String, TimeWindow> {
        @Override
        public void process(String key, Context context, Iterable<IoTEvent> elements, Collector<AggregatedResult> out) {
            double sumTemp = 0;
            double sumHumidity = 0;
            int count = 0;

            for (IoTEvent event : elements) {
                sumTemp += event.temperature;
                sumHumidity += event.humidity;
                count++;
            }

            double avgTemp = sumTemp / count;
            double avgHumidity = sumHumidity / count;

            out.collect(new AggregatedResult(key, context.window().getStart(), context.window().getEnd(), avgTemp, avgHumidity));
        }
    }

    public static class AggregatedResult {
        public String deviceId;
        public long windowStart;
        public long windowEnd;
        public double avgTemperature;
        public double avgHumidity;

        public AggregatedResult(String deviceId, long windowStart, long windowEnd, double avgTemperature, double avgHumidity) {
            this.deviceId = deviceId;
            this.windowStart = windowStart;
            this.windowEnd = windowEnd;
            this.avgTemperature = avgTemperature;
            this.avgHumidity = avgHumidity;
        }
    }
}

6. 输出结果到外部系统

处理后的数据通常需要写到数据库、文件系统或者其他外部系统。以写入到Kafka为例:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer, parsing, and processing setup...

        DataStream<AggregatedResult> resultStream = parsedStream
            .keyBy(event -> event.deviceId)
            .timeWindow(Time.minutes(1))
            .process(new AggregateTemperatureHumidity());

        resultStream.map(result -> result.toString())
            .addSink(new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties));

        env.execute("IoT Data Processing with Flink");
    }
}
javaCopy Codeimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer, parsing, and processing setup...

        DataStream<AggregatedResult> resultStream = parsedStream
            .keyBy(event -> event.deviceId)
            .timeWindow(Time.minutes(1))
            .process(new AggregateTemperatureHumidity());

        resultStream.map(result -> result.toString())
            .addSink(new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties));

        env.execute("IoT Data Processing with Flink");
    }
}

7. 完整代码示例

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Properties;

public class IoTDataProcessing {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-group");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("iot-topic", new SimpleStringSchema(), properties);
        DataStream<String> input = env.addSource(kafkaConsumer);

        DataStream<IoTEvent> parsedStream = input.map(new MapFunction<String, IoTEvent>() {
            private ObjectMapper mapper = new ObjectMapper();

            @Override
            public IoTEvent map(String value) throws Exception {
                return mapper.readValue(value, IoTEvent.class);
            }
        });

        DataStream<AggregatedResult> resultStream = parsedStream
            .keyBy(event -> event.deviceId)
            .timeWindow(Time.minutes(1))
            .process(new AggregateTemperatureHumidity());

        resultStream.map(result -> result.toString())
            .addSink(new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties));

        env.execute("IoT Data Processing with Flink");
    }

    public static class IoTEvent {
        public String deviceId;
        public long timestamp;
        public double temperature;
        public double humidity;

        // constructor, getters, setters...
    }

    public static class AggregateTemperatureHumidity extends ProcessWindowFunction<IoTEvent, AggregatedResult, String, TimeWindow> {
        @Override
        public void process(String key, Context context, Iterable<IoTEvent> elements, Collector<AggregatedResult> out) {
            double sumTemp = 0;
            double sumHumidity = 0;
            int count = 0;

            for (IoTEvent event : elements) {
                sumTemp += event.temperature;
                sumHumidity += event.humidity;
                count++;
            }

            double avgTemp = sumTemp / count;
            double avgHumidity = sumHumidity / count;

            out.collect(new AggregatedResult(key, context.window().getStart(), context.window().getEnd(), avgTemp, avgHumidity));
        }
    }

    public static class AggregatedResult {
        public String deviceId;
        public long windowStart;
        public long windowEnd;
        public double avgTemperature;
        public double avgHumidity;

        public AggregatedResult(String deviceId, long windowStart, long windowEnd, double avgTemperature, double avgHumidity) {
            this.deviceId = deviceId;
            this.windowStart = windowStart;
            this.windowEnd = windowEnd;
            this.avgTemperature = avgTemperature;
            this.avgHumidity = avgHumidity;
        }

        @Override
        public String toString() {
            return "AggregatedResult{" +
                    "deviceId='" + deviceId + '\'' +
                    ", windowStart=" + windowStart +
                    ", windowEnd=" + windowEnd +
                    ", avgTemperature=" + avgTemperature +
                    ", avgHumidity=" + avgHumidity +
                    '}';
        }
    }
}

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注