您现在的位置是:主页 > news > 做私人网站/深圳网页设计公司

做私人网站/深圳网页设计公司

admin2025/6/22 2:13:17news

简介做私人网站,深圳网页设计公司,做外贸网站维护费是多少,如何建设视频网站Avro是一种与编程语言无关的序列化格式,Avro数据通过与语言无关的schema来定义,schema通过json来描述,不过一般会使用二进制文件。Avro在读写文件时需要用到schema,schema一般会被内嵌在数据文件里。Avro有一个特性,当负责写消息的…

做私人网站,深圳网页设计公司,做外贸网站维护费是多少,如何建设视频网站Avro是一种与编程语言无关的序列化格式,Avro数据通过与语言无关的schema来定义,schema通过json来描述,不过一般会使用二进制文件。Avro在读写文件时需要用到schema,schema一般会被内嵌在数据文件里。Avro有一个特性,当负责写消息的…

Avro是一种与编程语言无关的序列化格式,Avro数据通过与语言无关的schema来定义,schema通过json来描述,不过一般会使用二进制文件。Avro在读写文件时需要用到schema,schema一般会被内嵌在数据文件里。Avro有一个特性,当负责写消息的应用程序使用了新的schema,负责读消息的应用程序可以继续处理消息而无需做任何改动。缺点:每条kafka记录中都嵌入了schema,这会让记录的大小成倍的增加,可以使用schema注册表将所有数据的schema保存在注册表里,然后在记录里引用schema的id。

参考书籍:《kafka权威指南》

下面就来实现一下Avro的序列化:

1.创建项目,编写pom文件:

<dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.8.2</version>
</dependency>
<dependency><groupId>com.twitter</groupId><artifactId>bijection-core_2.11</artifactId><version>0.9.6</version>
</dependency>
<dependency><groupId>com.twitter</groupId><artifactId>bijection-avro_2.11</artifactId><version>0.9.6</version>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.1.0</version>
</dependency>

2.创建producer,使用schema来定义格式,相当于我们定义的类,里面含有一些字段:

package avrodemo;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
//发送序列化的信息
public class AvroProducer{public static void main(String[] args)throws Exception{String schemaStr="{\"type\":\"record\",\"name\":\"Student\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}";Properties props=new Properties();props.put("bootstrap.servers","192.168.184.128:9092");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");Schema.Parser parser=new Schema.Parser();Schema schema=parser.parse(schemaStr);Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);Producer<String, byte[]> producer = new KafkaProducer<>(props);GenericRecord avroRecord = new GenericData.Record(schema);avroRecord.put("id",123);avroRecord.put("name","jack");avroRecord.put("age",18);byte[] avroRecordBytes = recordInjection.apply(avroRecord);ProducerRecord<String, byte[]> record = new ProducerRecord<>("wyh-avro-topic", avroRecordBytes);producer.send(record).get();}
}

3.创建消费者:

package avrodemo;
import java.util.Collections;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
//反序列化消息
public class AvroConsumer{public static void main(String[] args){Properties props=new Properties();props.put("bootstrap.servers","192.168.184.128:9092");props.put("group.id","wyh-avro-group");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("wyh-avro-topic"));Schema.Parser parser=new Schema.Parser();String schemaStr="{\"type\":\"record\",\"name\":\"Student\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}";    Schema schema=parser.parse(schemaStr);Injection<GenericRecord, byte[]> recordInjection = GenericAvrocodecs.toBinary(schema);while(true){ConsumerRecords<String, byte[]> records = consumer.poll(100);for(ConsumerRecord<String, byte[]> record : records){GenericRecord genericRecord = recordInjection.invert(record.value()).get();systen.out.printin("value = [student.id = "+genericRecord.get("id")+",student.name = "+genericRecord.get("name")+",student.age = "+genericRecord.get("age")+"],"+"partition = "+record.partition()+",offset = "+record.offset());}}}
}

先启动消费者,再启动生产者,查看控制台:

这样就可以使用kafka提供的Avro序列化框架来传送各种类型的数据了。