Flink-Connectors(连接器)(1)JDBC

news/2024/5/19 2:10:22 标签: mysql, flink, jdbc, java

什么是连接器

预定义的源和接收器

​ Flink内置了一些基本数据源和接收器,这些数据源和接收器始终可用。该预定义的数据源包括文件、Mysql、RabbitMq、Kafka、ES

等,同时也支持数据输出到文件、Mysql、RabbitMq、Kafka、ES等。

​ 简单的说:flink连接器就是将某些数据源加载数据输出做了封装(连接器),我们只要引入对应的连接器依赖,即可快速的完成对数据源的加载以及数据的输出。

​ 例如我们使用JDBC 连接器,即可快速的使用JDBC从数据库中加载数据源并支持数据通过JDBC 输出到我们的数据库。

使用JDBC连接器 快速输出到Mysql

必要依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.12.2</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
</dependency>

编码实现

java">package com.leilei;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

/**
 * @author lei
 * @version 1.0
 * @date 2021/3/13 21:07
 * @desc flink jdbc 连接器
 */
public class FlinkConnector1_JDBC {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        DataStreamSource<VehicleAlarm> source = env.addSource(new MySource());
        String sql = "insert into vehicle_alarm_202103 (`id`,`license_plate`,`plate_color`,`device_time`,`zone`) " +
                "values(?,?,?,?,?)";
        JdbcConnectionOptions jdbcBuild = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withDriverName("com.mysql.jdbc.Driver")
                .withUrl("jdbc:mysql://xxxx:3306/alarm-sc?useUnicode=true&characterEncoding=utf-8&useSSL=false")
                .withUsername("root")
                .withPassword("root")
                .build();
        /**
         * 使用JDBC连接器 将数据发送到Mysql
         */
        source.addSink(JdbcSink.sink(sql, (ps, vehicle) -> {
            ps.setString(1, vehicle.getId());
            ps.setString(2, vehicle.getLicensePlate());
            ps.setString(3, vehicle.getPlateColor());
            ps.setLong(4, vehicle.getDeviceTime());
            ps.setString(5, vehicle.getZone());
        }, jdbcBuild));
        try {
            env.execute("jdbc-connector");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class VehicleAlarm {
        private String id;
        private String licensePlate;
        private String plateColor;
        private Long deviceTime;
        private String zone;
    }

    /**
     * 自定义数据源
     */
    public static class MySource extends RichSourceFunction<VehicleAlarm> {
     
        @Override
        public void run(SourceContext<VehicleAlarm> ctx) throws Exception {
                long id = System.currentTimeMillis()/1000;
                VehicleAlarm vehicleAlarm = new VehicleAlarm(String.valueOf(id), "川A" + id,
                        "紫", System.currentTimeMillis(), "sc");
                ctx.collect(vehicleAlarm);
                Thread.sleep(10000);
            }

        
        @Override
        public void cancel() {

        }
    }

}


http://www.niftyadmin.cn/n/1327818.html

相关文章

徽信排行榜显示服务器忙是什么意思,视频显示对方忙是什么意思

大家好&#xff0c;我是时间财富网智能客服时间君&#xff0c;上述问题将由我为大家进行解答。视频显示对方忙线中&#xff0c;有以下几种原因&#xff1a;1、视频打过去&#xff0c;响了两声显示对方忙&#xff0c;可能是对方挂掉了&#xff0c;那你就等待一会打过去&#xff…

三维图像处理_CT/MRI等二维图像处理及三维重构的一些看法

时隔数年&#xff0c;又回到知乎&#xff0c;对这个知识分享平台&#xff08;说不上是知识吧&#xff0c;经验&#xff1f;个人理解&#xff1f;&#xff0c;毕竟还算比某引擎搜索的全是广告来的靠谱&#xff09;感觉还算挺亲切的。三维重构或者说三维可视化目前刚好涉及到这一…

macos系统镜像iso_Windows10操作系统iso镜像、微软正版软件下载站:MSDN,我告诉你...

突然发现不知道该写什么&#xff0c;貌似肚里墨水已吐槽干净了。既然没啥干货分享&#xff0c;就分享一些好资源吧&#xff01;MSDN 的全称是 Microsoft Developer Network。这是微软公司面向软件开发者的一种信息服务。而我这次分享的却叫《MSDN&#xff0c;我告诉你&#xff…

win10关机后自动重启_[电脑技巧] 电脑win10能不能设置自动关机?

现在我们经常用电脑下载一些大型软件、文件之类的&#xff0c;但是晚上的时候怎么办呢?总不能陪电脑熬夜吧。这时候就可以启用电脑自动关机了。下面&#xff0c;我就教你们设置win10自动关机的操作方法。1、首先在开始菜单搜索框中输入计划任务&#xff0c;点击任务计划程序2、…

google protobuf 实体类和java对象互转_ProtoBuf为什么被吹出天际

Google在Protocol buffers的官网首页开宗明义&#xff0c;指出Protocol buffers有语言无关、平台无关、可扩展的特性&#xff1b;重要的是在进行序列化时&#xff0c;相比其他结构化数据格式(Json、XML)&#xff0c;它&#xff1a;更小&#xff0c;更快、更简单。先看看定义Pro…

电脑常用音频剪辑软件_5个实用的音频剪辑软件

有音频剪辑需求的人&#xff0c;一定要好好看这篇文章哦&#xff5e;1.蜜蜂剪辑蜜蜂剪辑是一款非常实用的电脑常用音频剪辑软件&#xff0c;同时也是一款专业的视频剪辑软件&#xff0c;可以在Windows、Mac、iOS和Android上使用。蜜蜂剪辑内置了一个无版权的音乐和音效库&#…

调节e18-d80nk的测量距离_重磅发布丨长距离综合管道潜望镜正式亮相|传感器|潜望镜|光学|远距离...

近日&#xff0c;大家熟知的X1-H潜望镜迎来了华丽升级。新品X1-H5管道潜望镜适用于DN300mm以上管道以及长距离箱涵检测&#xff0c;检测距离150m&#xff0c;解决了QV长距离大管径情况下无法高清检测的问题。长距离综合检测超级潜望镜综合检测系统&#xff0c;检测距离150m。标…

docker 远程连接 文件看不到_深入浅出容器学习--Docker数据卷 - yuhaohao

Docker镜像是由多个文件系统(只读层)叠加而成&#xff0c;当启动一个容器的时候&#xff0c;Docker会加载只读镜像层并在其上(镜像栈顶部)添加一个读写层。如果运行中的容器修改了现有的一个已经存在的文件&#xff0c;那该文件将会从读写层下面的只读层复制到读写层&#xff0…