先看官方文档步骤:需要一个编解码器,看源码:可见内置了需要数据类型的实现,所以发送其他消息可以发送,但是如果发送自定义对象就需要自己实现编解码逻辑了一 自定义编解码器一 自定义编解码器
/**
* 自定义对象编解码器,两个类型可用于消息转换,即发送对象转换为接受需要的对象
*/
public class CustomizeMessageCodec implements MessageCodec {

/**

* 将消息实体封装到Buffer用于传输

* 实现方式:使用对象流从对象中获取Byte数组然后追加到Buffer

*/

@Override

public void encodeToWire(Buffer buffer, OrderMessage orderMessage) {

final ByteArrayOutputStream b = new ByteArrayOutputStream();

try (ObjectOutputStream o = new ObjectOutputStream(b)){

o.writeObject(orderMessage);

o.close();

buffer.appendBytes(b.toByteArray());

} catch (IOException e) { e.printStackTrace(); }

}

//从Buffer中获取消息对象

@Override

public OrderMessage decodeFromWire(int pos, Buffer buffer) {

final ByteArrayInputStream b = new ByteArrayInputStream(buffer.getBytes());

OrderMessage msg = null;

try (ObjectInputStream o = new ObjectInputStream(b)){ msg = (OrderMessage) o.readObject();

} catch (IOException | ClassNotFoundException e) { e.printStackTrace(); }

return msg;

}

//消息转换

@Override

public OrderMessage transform(OrderMessage orderMessage) {

System.out.println("消息转换---");//可对接受消息进行转换,比如转换成另一个对象等

orderMessage.setName("姚振");

return orderMessage;

}

@Override

public String name() { return "myCodec"; }

//识别是否是用户自定义编解码器,通常为-1

@Override

public byte systemCodecID() { return -1; }

public static MessageCodec create() {

return new CustomizeMessageCodec();

}
}
/**
* 自定义对象编解码器,两个类型可用于消息转换,即发送对象转换为接受需要的对象
*/
public class CustomizeMessageCodec implements MessageCodec {

/**

* 将消息实体封装到Buffer用于传输

* 实现方式:使用对象流从对象中获取Byte数组然后追加到Buffer

*/

@Override

public void encodeToWire(Buffer buffer, OrderMessage orderMessage) {

final ByteArrayOutputStream b = new ByteArrayOutputStream();

try (ObjectOutputStream o = new ObjectOutputStream(b)){

o.writeObject(orderMessage);

o.close();

buffer.appendBytes(b.toByteArray());

} catch (IOException e) { e.printStackTrace(); }

}

//从Buffer中获取消息对象

@Override

public OrderMessage decodeFromWire(int pos, Buffer buffer) {

final ByteArrayInputStream b = new ByteArrayInputStream(buffer.getBytes());

OrderMessage msg = null;

try (ObjectInputStream o = new ObjectInputStream(b)){ msg = (OrderMessage) o.readObject();

} catch (IOException | ClassNotFoundException e) { e.printStackTrace(); }

return msg;

}

//消息转换

@Override

public OrderMessage transform(OrderMessage orderMessage) {

System.out.println("消息转换---");//可对接受消息进行转换,比如转换成另一个对象等

orderMessage.setName("姚振");

return orderMessage;

}

@Override

public String name() { return "myCodec"; }

//识别是否是用户自定义编解码器,通常为-1

@Override

public byte systemCodecID() { return -1; }

public static MessageCodec create() {

return new CustomizeMessageCodec();

}
}这里有一个点要注意,nam方法是必须的,且发送的时候一定要指明name二 发送消息编写二 发送消息编写
public class ProducerVerticle extends AbstractVerticle {

@Override

public void start() throws Exception {

EventBus eventBus = vertx.eventBus();

//发布消息(群发)

eventBus.publish("com.hou", "群发祝福!");

//发送消息(单发),只会发送注册此地址的一个,采用不严格的轮询算法选择

DeliveryOptions options = new DeliveryOptions();//设置消息头等

options.addHeader("some-header", "some-value");

eventBus.send("com.hou", "单发消息",options,ar->{

if(ar.succeeded()) System.out.println("收到消费者确认信息:"+ar.result().body());

});

//发送自定义对象,需要编解码器

eventBus.registerCodec(CustomizeMessageCodec.create());//注册编码器

DeliveryOptions options1 = new DeliveryOptions().setCodecName("myCodec");//必须指定名字

OrderMessage orderMessage = new OrderMessage();

orderMessage.setName("侯征");

eventBus.send("com.hou", orderMessage, options1);

}
}
public class ProducerVerticle extends AbstractVerticle {

@Override

public void start() throws Exception {

EventBus eventBus = vertx.eventBus();

//发布消息(群发)

eventBus.publish("com.hou", "群发祝福!");

//发送消息(单发),只会发送注册此地址的一个,采用不严格的轮询算法选择

DeliveryOptions options = new DeliveryOptions();//设置消息头等

options.addHeader("some-header", "some-value");

eventBus.send("com.hou", "单发消息",options,ar->{

if(ar.succeeded()) System.out.println("收到消费者确认信息:"+ar.result().body());

});

//发送自定义对象,需要编解码器

eventBus.registerCodec(CustomizeMessageCodec.create());//注册编码器

DeliveryOptions options1 = new DeliveryOptions().setCodecName("myCodec");//必须指定名字

OrderMessage orderMessage = new OrderMessage();

orderMessage.setName("侯征");

eventBus.send("com.hou", orderMessage, options1);

}
}三 接受消息Verticle编写三 接受消息Verticle编写
public class ConsumerVerticle extends AbstractVerticle {

@Override

public void start() throws Exception {

//每个Vertx实例默认是单例

EventBus eb = vertx.eventBus();

//注册处理器,消费com.hou发送的消息

MessageConsumer consumer = eb.consumer("com.hou");//订阅地址

consumer.handler(message -> {//消息处理器

if(message.body() instanceof OrderMessage){

System.out.println("接受到对象: " + ((OrderMessage) message.body()).getName());

}

System.out.println("我是普通消费者: " + message.body());

message.reply("收到了!"); // 回复生产者,send才能接受

}).completionHandler(res -> {//注册完成后通知事件,适用于集群中比较慢的情况下

System.out.println("注册处理器结果"+res.succeeded());

});

//撤销处理器

//consumer.unregister();

}
}
public class ConsumerVerticle extends AbstractVerticle {

@Override

public void start() throws Exception {

//每个Vertx实例默认是单例

EventBus eb = vertx.eventBus();

//注册处理器,消费com.hou发送的消息

MessageConsumer consumer = eb.consumer("com.hou");//订阅地址

consumer.handler(message -> {//消息处理器

if(message.body() instanceof OrderMessage){

System.out.println("接受到对象: " + ((OrderMessage) message.body()).getName());

}

System.out.println("我是普通消费者: " + message.body());

message.reply("收到了!"); // 回复生产者,send才能接受

}).completionHandler(res -> {//注册完成后通知事件,适用于集群中比较慢的情况下

System.out.println("注册处理器结果"+res.succeeded());

});

//撤销处理器

//consumer.unregister();

}
}四 注册部署Verticcle四 注册部署Verticcle
vertx.deployVerticle(ConsumerVerticle.class.getName());

TimeUnit.SECONDS.sleep(1);

vertx.deployVerticle(ProducerVerticle.class.getName());
vertx.deployVerticle(ConsumerVerticle.class.getName());

TimeUnit.SECONDS.sleep(1);

vertx.deployVerticle(ProducerVerticle.class.getName());五 测试五 测试以上就是本文的全部内容,希望对大家的学习有所帮助。
标签:

你有充足的理由选择我们

深圳上海杭州北京前端外包开发:工作10年以上的小伙伴团队,前端开发工作是我们最大的事业。所有您担心的问题,都可以写到合同里。我们会100%努力完成,直到您满意!