您现在的位置是:主页 > news > 郑州网站设计制作价格/专业seo网络营销公司
郑州网站设计制作价格/专业seo网络营销公司
admin2025/6/24 0:01:46【news】
简介郑州网站设计制作价格,专业seo网络营销公司,网站开发技术指标是什么,海南建设厅评审网站一、concepts 1、表 表可以是虚拟(VIEWS)或常规(TABLES)。VIEWS可以从现有Table对象创建,通常是Table API或SQL查询的结果。TABLES描述外部数据,例如文件,数据库表或消息队列。表三部分标识符&a…
郑州网站设计制作价格,专业seo网络营销公司,网站开发技术指标是什么,海南建设厅评审网站一、concepts
1、表
表可以是虚拟(VIEWS)或常规(TABLES)。VIEWS可以从现有Table对象创建,通常是Table API或SQL查询的结果。TABLES描述外部数据,例如文件,数据库表或消息队列。表三部分标识符&a…
一、concepts
1、表
表可以是虚拟(VIEWS)或常规(TABLES)。VIEWS可以从现有Table对象创建,通常是Table API或SQL查询的结果。TABLES描述外部数据,例如文件,数据库表或消息队列。表三部分标识符:目录、数据库、表名。其中,目录、数据库是可选的。tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");1.1 临时表与永久表
表可以是临时的,并与单个Flink会话的生命周期相关,也可以是永久的,并且在多个Flink会话和群集中可见。永久表需要一个目录(例如Hive Metastore)来维护有关表的元数据。创建永久表后,连接到目录的任何Flink会话都可以看到该表,并且该表将继续存在,直到明确删除该表为止。另一方面,临时表始终存储在内存中,并且仅在它们在其中创建的Flink会话期间存在。这些表对其他会话不可见。它们没有绑定到任何目录或数据库,但可以在一个目录或数据库的名称空间中创建。如果删除了它们的相应数据库,则不会删除临时表。1.2 表的创建
(1)虚拟表tableEnv.createTemporaryView("projectedTable", projTable);
(2)通过连接器(数据源)tableEnvironment.connect(...).withFormat(...).withSchema(...).inAppendMode().createTemporaryTable("tableName")二、table 工程搭建
maven依赖<!--根据目标编程语言,您需要将Java或Scala API添加到项目中,以便使用Table API和SQL定义管道:--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.11.0</version><!--<scope>provided</scope>--></dependency><!--如果要在IDE中本地运行Table API和SQL程序--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.11.0</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>1.11.0</version><!-- <scope>provided</scope>--></dependency><!--在内部,表生态系统的一部分在Scala中实现。因此,请确保为批处理和流应用程序都添加以下依赖项:--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.11.0</version><!--<scope>provided</scope>--></dependency><!--如果要实现与Kafka或一组用户定义函数进行交互的自定义格式,则以下依赖关系就足够了,并且可以用于SQL Client的JAR文件:--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.11.0</version><!--<scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.11.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.11.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.11.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.11.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.11.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.11.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.11.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>RELEASE</version><scope>compile</scope></dependency>1、读取文件创建表,打印在控制台
1.1 流式
package com.flink.sql.environment.readFile;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;public class ReadFileCreateTableStream {public static void main(String[] args) throws Exception {//1、流式环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2、连接外部文件系统,格式,注册字段,临时表tEnv.connect(new FileSystem().path("D:\\test\\a.txt")).withFormat(new OldCsv()).withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())).inAppendMode().createTemporaryTable("Orders");//3、读取表Table orders = tEnv.from("Orders");//4、读取表字段Table counts = orders.select($("name"),$("age"));//5、转化成DataStream打印在控制台DataStream<Row> rowDataStream = tEnv.toAppendStream(counts, Row.class);rowDataStream.print();env.execute("readFileCreateTableStream");}
}1.2 批式
package com.flink.sql.environment.readFile;import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class ReadFileCreateTableBatch {public static void main(String[] args) throws Exception {//1、批式环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);//2、连接外部文件系统,格式,注册字段,临时表tEnv.connect(new FileSystem().path("D:\\test\\a.txt")).withFormat(new OldCsv()).withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())).inAppendMode().createTemporaryTable("Orders");//3、读取表Table orders = tEnv.from("Orders");//4、读取表字段Table counts = orders.select($("name"),$("age"));//5、转化成DataStream打印在控制台DataSet<Row> result = tEnv.toDataSet(counts, Row.class);result.print();}
}2、group by
2.1 stream
package com.flink.sql.environment.groupBy;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class ReadFileCreateTableStream {public static void main(String[] args) throws Exception {//1、流式环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2、连接外部文件系统,格式,注册字段,临时表tEnv.connect(new FileSystem().path("D:\\test\\a.txt")).withFormat(new OldCsv()).withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())).inAppendMode().createTemporaryTable("Orders");//3、读取表Table orders = tEnv.from("Orders");//4、读取表字段Table select = orders.groupBy($("name")).select($("name"), $("age").count().as("count"));//5、转化成DataStream打印在控制台DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tEnv.toRetractStream(select, Row.class);tuple2DataStream.print();env.execute("readFileCreateTableStream");}
}2.2 batch
package com.flink.sql.environment.groupBy;import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class ReadFileCreateTableBatch {public static void main(String[] args) throws Exception {//1、批式环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);//2、连接外部文件系统,格式化方法,注册字段,临时表tEnv.connect(new FileSystem().path("D:\\test\\a.txt")).withFormat(new OldCsv()).withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())).inAppendMode().createTemporaryTable("Orders");//3、读取表Table orders = tEnv.from("Orders");Table select = orders.groupBy($("name")).select($("name"), $("age").count().as("count"));//5、转化成DataStream打印在控制台DataSet<Row> result = tEnv.toDataSet(select, Row.class);result.print();}
}3、flink query sql
package com.flink.sql.environment.sqlQuery;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class ReadFileCreateTableStream {public static void main(String[] args) throws Exception {//1、流式环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2、连接外部文件系统,格式,注册字段,临时表tEnv.connect(new FileSystem().path("D:\\test\\a.txt")).withFormat(new OldCsv()).withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())).inAppendMode().createTemporaryTable("Orders");//3、sql查询Table table = tEnv.sqlQuery("select name from Orders");//4、转化成DataStream打印在控制台DataStream<Row> rowDataStream = tEnv.toAppendStream(table, Row.class);rowDataStream.print();env.execute("readFileCreateTableStream");}
}4、flink table消费kafka
package com.flink.sql.environment.kafka;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class ReadKafkaCreateTableStream {public static void main(String[] args) throws Exception {//1、流式环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2、连接外部文件系统,格式,注册字段,临时表tEnv.connect(new Kafka().version("universal").topic("aaaa").startFromLatest().property("bootstrap.servers", "centos:9092")).withFormat(new Csv()).withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())).inAppendMode().createTemporaryTable("Orders");//3、读取表Table orders = tEnv.from("Orders");//4、读取表字段Table counts = orders.select($("name"),$("age"));//5、转化成DataStream打印在控制台DataStream<Row> rowDataStream = tEnv.toAppendStream(counts, Row.class);rowDataStream.print();env.execute("readFileCreateTableStream");}
}5、与DataStream和DataSet API集成
可以通过将DataStream或DataSet转换为Table`,反之亦然来实现此交互。5.1将DataStream或DataSet转换为表
(1)基于tuple
package com.flink.sql.environment.streamToTable;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class StreamToTable {public static void main(String[] args) throws Exception {//1、流式环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2、文本数据源DataStreamSource<String> streamSource = env.readTextFile("D:\\test\\a.txt");//3、数据源映射成tuple2SingleOutputStreamOperator<Tuple2<String,String>> streamOperator = streamSource.map(new MapFunction<String, Tuple2<String,String>>() {@Overridepublic Tuple2<String,String> map(String s) throws Exception {String[] split = s.split(",");return new Tuple2<>(split[0],split[1]);}});//4、将DataStream转换为table并带有fieldsTable table = tEnv.fromDataStream(streamOperator,$("name"),$("age"));//5、table 查询Table name = table.select("name");//6、table转换成流打印在控制台DataStream<Row> rowDataStream = tEnv.toAppendStream(name, Row.class);rowDataStream.print();env.execute("StreamToTable");}
}(2)基于pojo类
POJO类型的规则,如果满足以下条件,则Flink会将数据类型识别为POJO类型(并允许“按名称”字段引用):(1)该类是公共的和独立的(没有非静态内部类)
(2)该类具有公共的无参数构造函数
(3)类(和所有超类)中的所有非静态,非瞬态字段都是公共的(并且是非最终的),或者具有公共的getter和setter方法,该方法遵循针对getter和setter的Java bean命名约定。请注意:如果无法将用户定义的数据类型识别为POJO类型,则必须将其定义为GenericType并使用Kryo进行序列化。package com.flink.sql.environment.streamToTable;public class Entity {private String name;private String country;public Entity() {}public Entity(String name, String country) {this.name = name;this.country = country;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getCountry() {return country;}public void setCountry(String country) {this.country = country;}
}package com.flink.sql.environment.streamToTable;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class StreamToTableEntity {public static void main(String[] args) throws Exception {//1、流式环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2、文本数据源DataStreamSource<String> streamSource = env.readTextFile("D:\\test\\a.txt");//3、数据源映射成POJOSingleOutputStreamOperator<Entity> streamOperator = streamSource.map(new MapFunction<String, Entity>() {@Overridepublic Entity map(String s) throws Exception {String[] split = s.split(",");return new Entity(split[0],split[1]);}});//4、将DataStream转换为table并带有fieldsTable table = tEnv.fromDataStream(streamOperator,$("name"),$("country"));//5、table 查询Table name = table.select("name");//6、table转换成流打印在控制台DataStream<Row> rowDataStream = tEnv.toAppendStream(name, Row.class);rowDataStream.print();env.execute("StreamToTablefile");}
}利用as为字段起别名package com.flink.sql.environment.streamToTable;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class StreamToTableEntity {public static void main(String[] args) throws Exception {//1、流式环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2、文本数据源DataStreamSource<String> streamSource = env.readTextFile("D:\\test\\a.txt");//3、数据源映射成tuple2SingleOutputStreamOperator<Entity> streamOperator = streamSource.map(new MapFunction<String, Entity>() {@Overridepublic Entity map(String s) throws Exception {String[] split = s.split(",");return new Entity(split[0],split[1]);}});//4、将DataStream转换为table并带有fields,利用as起别名Table table = tEnv.fromDataStream(streamOperator,$("name").as("myDefined_name"),$("country"));//5、table 查询Table name = table.select($("myDefined_name"),$("country"));//6、table转换成流打印在控制台DataStream<Row> rowDataStream = tEnv.toAppendStream(name, Row.class);rowDataStream.print();env.execute("StreamToTablefile");}
}5.2 将表转换为DataStream或DataSet
(1)转tuple类型
package com.flink.sql.environment.streamToTable;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.expressions.In;public class TableToStream {public static void main(String[] args) throws Exception {//1、流式环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2、文本数据源tEnv.connect(new FileSystem().path("D:\\test\\a.txt")).withFormat(new OldCsv()).withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())).inAppendMode().createTemporaryTable("Orders");//3、读取表Table orders = tEnv.from("Orders");TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING,Types.INT);DataStream<Tuple2<String, Integer>> tuple = tEnv.toAppendStream(orders, tupleType);tuple.print();env.execute("StreamToTable");}
}(2)转pojo类型
package com.flink.sql.environment.streamToTable;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.expressions.In;public class TableToStream {public static void main(String[] args) throws Exception {//1、流式环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2、文本数据源tEnv.connect(new FileSystem().path("D:\\test\\a.txt")).withFormat(new OldCsv()).withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())).inAppendMode().createTemporaryTable("Orders");//3、读取表Table orders = tEnv.from("Orders");DataStream<Tuple2<Boolean, POJO>> dataStream = tEnv.toRetractStream(orders, POJO.class);dataStream.print("pojo");env.execute("StreamToTablefile");}
}6、sink到外部文件系统
6.1 流式输出(外部文本系统)
package com.flink.sql.environment.outPut;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class StreamToTablefile {public static void main(String[] args) throws Exception {//1、流式环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2、文本数据源DataStreamSource<String> streamSource = env.readTextFile("D:\\test\\a.txt");//3、数据源映射成tuple2SingleOutputStreamOperator<Tuple2<String,String>> streamOperator = streamSource.map(new MapFunction<String, Tuple2<String,String>>() {@Overridepublic Tuple2<String,String> map(String s) throws Exception {String[] split = s.split(",");return new Tuple2<>(split[0],split[1]);}});//4、将DataStream转换为table并带有fieldsTable table = tEnv.fromDataStream(streamOperator,$("name"),$("age"));//5、table 查询Table name = table.select($("name"),$("age"));DataStream<Row> rowDataStream = tEnv.toAppendStream(name, Row.class);rowDataStream.writeAsText("D:\\test\\b.txt");env.execute("StreamToTablefile");}
}6.2 table输出(外部文本系统)
package com.flink.sql.environment.outPut;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class StreamToTablefile {public static void main(String[] args) throws Exception {//1、流式环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2、文本数据源DataStreamSource<String> streamSource = env.readTextFile("D:\\test\\a.txt");//3、数据源映射成tuple2SingleOutputStreamOperator<Tuple2<String,String>> streamOperator = streamSource.map(new MapFunction<String, Tuple2<String,String>>() {@Overridepublic Tuple2<String,String> map(String s) throws Exception {String[] split = s.split(",");return new Tuple2<>(split[0],split[1]);}});//4、将DataStream转换为table并带有fieldsTable table = tEnv.fromDataStream(streamOperator,$("name"),$("age"));//5、table 查询Table name = table.select($("name"),$("age"));//6、指定外部系统tEnv.connect(new FileSystem().path("D:\\test\\b.txt")).withFormat(new Csv()).withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.STRING())).createTemporaryTable("outPutTable");//7、执行并输出外部系统name.executeInsert("outPutTable");env.execute("StreamToTablefile");}
}7、sink到kafka
程序会报一下错误,但是不影响预期功能:Exception in thread “main” java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.package com.flink.sql.environment.outPut;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;import static org.apache.flink.table.api.Expressions.$;public class SourceKafakSinkKafak {public static void main(String[] args) throws Exception {//1、流式环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2、连接外部文件系统,格式,注册字段,临时表tEnv.connect(new Kafka().version("universal").topic("aaaa").startFromLatest().property("bootstrap.servers", "centos:9092")).withFormat(new Csv()).withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())).createTemporaryTable("Orders");//3、读取表Table orders = tEnv.from("Orders");//4、读取表字段Table select = orders.select($("name").substring(1,3), $("age"));tEnv.connect(new Kafka().version("universal").topic("bbbb").property("bootstrap.servers", "centos:9092")).withFormat(new Csv()).withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.BIGINT())).createTemporaryTable("outPut_table");select.executeInsert("outPut_table");env.execute("readFileCreateTableStream");}
}