电商| 物流| 科技| 创业| 经商| 运营| 科普| 财经| 文娱| AI| 物联| 品牌| 会议| 政策| 时尚| 健康| 家居| 金融| 农业| 汽车| 房产| 百科| 生活| 游戏| 管理| 快讯
 
首页 » 资讯 » 科技 » Reactive-MongoDB异步Java Driver解读

Reactive-MongoDB异步Java Driver解读

放大字体  缩小字体 时间:2020-09-18 11:35    热度:192
一、关于 异步驱动从3.0 版本开始,MongoDB 开始提供异步方式的驱动(Java Async Driver),这为应用提供了一种更高性能的选择。但。。。

 一、关于 异步驱动

从3.0 版本开始,MongoDB 开始提供异步方式的驱动(Java Async Driver),这为应用提供了一种更高性能的选择。

但实质上,使用同步驱动(Java Sync Driver)的项目也不在少数,或许是因为先入为主的原因(同步Driver的文档说明更加的完善),又或者是为了兼容旧的 MongoDB 版本。

无论如何,由于 Reactive 的发展,未来使用异步驱动应该是一个趋势。

在使用 Async Driver 之前,需要对 Reactive 的概念有一些熟悉。

二、理解 Reactive (响应式)

响应式(Reactive)是一种异步的、面向数据流的开发方式,最早是来自于.NET 平台上的 Reactive Extensions 库,随后被扩展为各种编程语言的实现。

在著名的 Reactive Manifesto(响应式宣言) 中,对 Reactive 定义了四个特征:

及时响应(Responsive):系统能及时的响应请求。   有韧性(Resilient):系统在出现异常时仍然可以响应,即支持容错。 有弹性(Elastic):在不同的负载下,系统可弹性伸缩来保证运行。 消息驱动(Message Driven):不同组件之间使用异步消息传递来进行交互,并确保松耦合及相互隔离。

在响应式宣言的所定义的这些系统特征中,无一不与响应式的流有若干的关系,于是乎就有了 2013年发起的 响应式流规范(Reactive Stream Specification)。

https://www.reactive-streams.org/

其中,对于响应式流的处理环节又做了如下定义:

具有处理无限数量的元素的能力,即允许流永不结束 按序处理 异步地传递元素 实现非阻塞的负压(back-pressure)

Java 平台则是在 JDK 9 版本上发布了对 Reactive Streams 的支持。

下面介绍响应式流的几个关键接口:

Publisher

Publisher 是数据的发布者。Publisher 接口只有一个方法 subscribe,用于添加数据的订阅者,也就是 Subscriber。

Subscriber

Subscriber 是数据的订阅者。Subscriber 接口有4个方法,都是作为不同事件的处理器。在订阅者成功订阅到发布者之后,其 onSubscribe(Subscription s) 方法会被调用。

Subscription 表示的是当前的订阅关系。

当订阅成功后,可以使用 Subscription 的 request(long n) 方法来请求发布者发布 n 条数据。发布者可能产生3种不同的消息通知,分别对应 Subscriber 的另外3个回调方法。

数据通知:对应 onNext 方法,表示发布者产生的数据。

错误通知:对应 onError 方法,表示发布者产生了错误。

结束通知:对应 onComplete 方法,表示发布者已经完成了所有数据的发布。

在上述3种通知中,错误通知和结束通知都是终结通知,也就是在终结通知之后,不会再有其他通知产生。

Subscription

Subscription 表示的是一个订阅关系。除了之前提到的 request 方法之外,还有 cancel 方法用来取消订阅。需要注意的是,在 cancel 方法调用之后,发布者仍然有可能继续发布通知。但订阅最终会被取消。

这几个接口的关系如下图所示:

图片出处:http://wiki.jikexueyuan.com/index.php/project/reactor-2.0/05.html

MongoDB 的异步驱动为 mongo-java-driver-reactivestreams 组件,其实现了 Reactive Stream 的上述接口。

> 除了 reactivestream 之外,MongoDB 的异步驱动还包含 RxJava 等风格的版本,有兴趣的读者可以进一步了解

http://mongodb.github.io/mongo-java-driver-reactivestreams/1.11/getting-started/quick-tour-primer/

三、使用示例

接下来,通过一个简单的例子来演示一下 Reactive 方式的代码风格:

A. 引入依赖

org.mongodb    mongodb-driver-reactivestreams    1.11.0 

> 引入mongodb-driver-reactivestreams 将会自动添加 reactive-streams, bson, mongodb-driver-async组件

B. 连接数据库

//服务器实例表List servers =newArrayList(); servers.add(newServerAddress("localhost",27018));//配置构建器MongoClientSettings.Builder settingsBuilder =MongoClientSettings.builder();//传入服务器实例 settingsBuilder.applyToClusterSettings(         builder -> builder.hosts(servers));//构建 Client 实例MongoClient mongoClient =MongoClients.create(settingsBuilder.build()); 

C. 实现文档查询

//获得数据库对象MongoDatabase database = client.getDatabase(databaseName);//获得集合MongoCollection collection = database.getCollection(collectionName);//异步返回PublisherFindPublisher publisher = collection.find();//订阅实现 publisher.subscribe(newSubscriber(){     @Override     publicvoid onSubscribe(Subscription s){         System.out.println("start...");         //执行请求         s.request(Integer.MAX_VALUE);      }     @Override     publicvoid onNext(document document){         //获得文档         System.out.println("document:"+ document.toJson());     }      @Override     publicvoid onError(Throwable t){         System.out.println("error occurs.");     }      @Override     publicvoid onComplete(){         System.out.println("finished.");     }}); 

注意到,与使用同步驱动不同的是,collection.find()方法返回的不是 Cursor,而是一个 FindPublisher对象,这是Publisher接口的一层扩展。

而且,在返回 Publisher 对象时,此时并没有产生真正的数据库IO请求。真正发起请求需要通过调用 Subscription.request()方法。

在上面的代码中,为了读取由 Publisher 产生的结果,通过自定义一个Subscriber,在onSubscribe 事件触发时就执行 数据库的请求,之后分别对 onNext、onError、onComplete进行处理。

尽管这种实现方式是纯异步的,但在使用上比较繁琐。试想如果对于每个数据库操作都要完成一个Subscriber 逻辑,那么开发的工作量是巨大的。

为了尽可能复用重复的逻辑,可以对Subscriber的逻辑做一层封装,包含如下功能:

使用 List 容器对请求结果进行缓存 实现阻塞等待结果的方法,可指定超时时间 捕获异常,在等待结果时抛出

代码如下:

publicclassObservableSubscriberimplementsSubscriber{      //响应数据     privatefinalList received;     //错误信息     privatefinalList errors;     //等待对象     privatefinalCountDownLatch latch;     //订阅器     privatevolatileSubscription subscription;     //是否完成     privatevolatileboolean completed;      publicObservableSubscriber(){         this.received =newArrayList();         this.errors =newArrayList();         this.latch =newCountDownLatch(1);     }      @Override     publicvoid onSubscribe(finalSubscription s){         subscription = s;     }      @Override     publicvoid onNext(final T t){         received.add(t);     }      @Override     publicvoid onError(finalThrowable t){         errors.add(t);         onComplete();     }      @Override     publicvoid onComplete(){         completed =true;         latch.countDown();     }      publicSubscription getSubscription(){         return subscription;     }      publicList getReceived(){         return received;     }      publicThrowable getError(){         if(errors.size()>0){             return errors.get(0);         }         returnnull;     }      publicboolean isCompleted(){         return completed;     }           publicListget(finallong timeout,finalTimeUnit unit)throwsThrowable{         return await(timeout, unit).getReceived();     }           publicObservableSubscriber await()throwsThrowable{         return await(Long.MAX_VALUE,TimeUnit.MILLISECONDS);     }           publicObservableSubscriber await(finallong timeout,finalTimeUnit unit)throwsThrowable{         subscription.request(Integer.MAX_VALUE);         if(!latch.await(timeout, unit)){             thrownewMongoTimeoutException("Publisher onComplete timed out");         }         if(!errors.isEmpty()){             throw errors.get(0);         }         returnthis;     }} 

借助这个基础的工具类,我们对于文档的异步操作就变得简单多了。

比如对于文档查询的操作可以改造如下:

ObservableSubscriber subscriber =newObservableSubscriber(); collection.find().subscribe(subscriber);//结果处理 subscriber.get(15,TimeUnit.SECONDS).forEach( d ->{     System.out.println("document:"+ d.toJson());}); 

当然,这个例子还有可以继续完善,比如使用 List 作为缓存,则要考虑数据量的问题,避免将全部(或超量) 的文档一次性转入内存。

作者:唐卓章

华为技术专家,多年互联网研发/架设经验,关注NOSQL 中间件高可用及弹性扩展,在分布式系统架构性能优化方面有丰富的实践经验,目前从事物联网平台研发工作,致力于打造大容量高可用的物联网服务。

 

关于Reactive-MongoDB异步Java Driver解读的要点介绍,希望对大家了解Reactive-MongoDB异步Java Driver解读有所帮助,如有侵权,联系我们37442552@qq.com。
 
你可能感兴趣:
 
芬兰政府指责微软对诺基亚始乱终弃 承诺一个都

2016-05-28

本周早些时候,微软宣称它将会裁减1850个工作岗位,其中有1350个工作位于芬兰。人们认为微软裁员之举预示着该公司新手机开发工作的终结。据外电报道,芬兰政…

三星最新发布的C5酷似iPhone 6 售价只有后者一半
三星最新发布的C5酷似iPhone 6 售价只有后者一

2016-05-28 三星 C5

三星最新发布的C5酷似iPhone 6 售价只有后者一半;三星周四在中国市场发布的最新款智能手机C5酷似苹果iPhone 6和6S。

苹果下架腾讯全系产品只是虚惊一场 淘宝、京东

2016-05-29 苹果 腾讯 APP

苹果下架腾讯全系产品只是虚惊一场 淘宝、京东等APP也未能幸免;苹果下架腾讯全系产品,搜索出现大面积瘫痪,淘宝、京东等APP也未能幸免。据了解,腾讯也曾因…

华为为何要在此时向三星发起专利战?背后的原因究竟是什么?
华为为何要在此时向三星发起专利战?背后的原因

2016-05-29 华为 三星 专利

华为为何要在此时向三星发起专利战?背后的原因究竟是什么?作为中国企业的华为,其在专利,尤其是与通信相关的专利的申请和积累在全球均名列前茅。而华为之…

2016中国互联网大会时间地点主题 互联网大会有何亮点?
2016中国互联网大会时间地点主题 互联网大会有

2016-06-02 2016 中国 互联网 大会

 由中国互联网协会主办的2016(第十五届)中国互联网大会将于6月21-23日在北京国际会议中心举行。本届大会主题为“繁荣网络经济 建设网络强国”。

Facebook周四下架了突发新闻通知应用Notify
Facebook周四下架了突发新闻通知应用Notify

2016-06-04 Facebook Notify

Facebook周四下架了突发新闻通知应用Notify;Facebook发言人在发给科技博客The Verge的声明中表示,Notify采用的技术将集成到Messenger中,所以内容发布商可…

阿里回应被SEC问询 马云:那并不代表公司有问题

2016-06-04

近期,阿里巴巴接受美国证券交易委员会问询,16年来日本软银集团首度出售手中阿里股份,阿里股价震荡,相关消息持续引发关注。2

iphone7上市时间确定 国行或5288元起售

2016-06-04

根据国外网站PC-Tablet的报导称,苹果仍将下一代iPhone的发布时刻定在今年9月份,至于详细日期则为美国当地时刻9月9日或9月16日

印度最大手机厂商明年来华抢市场 有戏吗?

2016-06-04

Micromax联合创始人维卡斯贾因(VikasJain)当天在香港举办的一场科技大会上表明,公司的目标是在2020年前变成按销量核算的全球第

索尼Xperia X系列终于要来了6月8日携手周杰伦发

2016-06-04

索尼的手机一直以来都是以拍照以及颜值闻名的,在今年的MWC2016大会上,索尼曾经发布了一款Xperia X系列产品中的Xperia XPerform

 
热点图文
三星最新发布的C5酷似iPhone 6 售价只有后者一半

三星最新发布的C5酷似iPhone 6 售价只有后者一半

华为为何要在此时向三星发起专利战?背后的原因究竟是什么?

华为为何要在此时向三星发起专利战?背后的原因究竟是什么?

2016中国互联网大会时间地点主题 互联网大会有何亮点?

2016中国互联网大会时间地点主题 互联网大会有何亮点?

Facebook周四下架了突发新闻通知应用Notify

Facebook周四下架了突发新闻通知应用Notify

戴尔确认出售软件业务:4年净赔16亿美元

戴尔确认出售软件业务:4年净赔16亿美元

沉迷于成人VR的日本年轻人  年轻男女都拒绝恋爱(图)

沉迷于成人VR的日本年轻人 年轻男女都拒绝恋爱(图)

今日头条母公司字节跳动科创板上市成功几率多大?

今日头条母公司字节跳动科创板上市成功几率多大?

余承东回应:华为开发自有系统 以防美国科技巨头不授权现有系统

余承东回应:华为开发自有系统 以防美国科技巨头不授权现有系统

 
经商宝 — 经商创业营销推广电子商务门户 网站地图 | 关于我们 | 特惠服务 | 人才招聘 | 联系我们 | 法律声明