blockingqueue如何使用Disruptor从Ringbuffer读取

blockingqueue  时间:2021-07-12  阅读:()

java 中怎么使用queue

展开全部 Queue接口与List、Set同一级别,都是继承了Collection接口。

LinkedList实现了Queue接 口。

Queue接口窄化了对LinkedList的方法的访问权限(即在方法中的参数类型如果是Queue时,就完全只能访问Queue接口所定义的方法 了,而不能直接访问 LinkedList的非Queue的方法),以使得只有恰当的方法才可以使用。

BlockingQueue 继承了Queue接口。

队列是一种数据结构.它有两个基本操作:在队列尾部加人一个元素,和从队列头部移除一个元素就是说,队列以一种先进先出的方式管理数据,如果你试图向一个 已经满了的阻塞队列中添加一个元素或者是从一个空的阻塞队列中移除一个元索,将导致线程阻塞.在多线程进行合作时,阻塞队列是很有用的工具。

工作者线程可 以定期地把中间结果存到阻塞队列中而其他工作者线线程把中间结果取出并在将来修改它们。

队列会自动平衡负载。

如果第一个线程集运行得比第二个慢,则第二个 线程集在等待结果时就会阻塞。

如果第一个线程集运行得快,那么它将等待第二个线程集赶上来。

下表显示了jdk1.5中的阻塞队列的操作: add 增加一个元索 如果队列已满,则抛出一个IIIegaISlabEepeplian异常 remove 移除并返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常 element 返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常 offer 添加一个元素并返回true 如果队列已满,则返回false poll 移除并返问队列头部的元素 如果队列为空,则返回null peek 返回队列头部的元素 如果队列为空,则返回null put 添加一个元素 如果队列满,则阻塞 take 移除并返回队列头部的元素 如果队列为空,则阻塞 remove、element、offer 、poll、peek 其实是属于Queue接口。

阻塞队列的操作可以根据它们的响应方式分为以下三类:aad、removee和element操作在你试图为一个已满的队列增加元素或从空队列取得元素时 抛出异常。

当然,在多线程程序中,队列在任何时间都可能变成满的或空的,所以你可能想使用offer、poll、peek方法。

这些方法在无法完成任务时 只是给出一个出错示而不会抛出异常。

java.util.concurrent的队列

java.util.concurrent ConcurrentLinkedQueue 类提供了高效的、可伸缩的、线程安全的非阻塞 FIFO 队列。

java.util.concurrent 中的五个实现都支持扩展的 BlockingQueue 接口,该接口定义了 put 和 take 的阻塞版本:LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue、PriorityBlockingQueue 和 DelayQueue。

这些不同的类覆盖了生产者-使用者、消息传递、并行任务执行和相关并发设计的大多数常见使用的上下文。

java中的queue类是什么,啥作用?

java5中新增加了java.util.Queue接口,用以支持队列的常见操作。

该接口扩展了java.util.Collection接口。

Queue使用时要尽量避免Collection的add()和remove()方法,而是要使用offer()来加入元素,使用poll()来获取并移出元素。

它们的优 点是通过返回值可以判断成功与否,add()和remove()方法在失败的时候会抛出异常。

如果要使用前端而不移出该元素,使用 element()或者peek()方法。

值得注意的是LinkedList类实现了Queue接口,因此我们可以把LinkedList当成Queue来用。

小例子: /** * * @author Zang XT */ import java.util.Queue; import java.util.LinkedList; public class TestQueue { public static void main(String[] args) { Queue<String> queue = new LinkedList<String>(); queue.offer("Hello"); queue.offer("World!"); queue.offer("你好!"); System.out.println(queue.size()); String str; while((str=queue.poll())!=null){ System.out.print(str); } System.out.println(); System.out.println(queue.size()); } } offer,add区别: 一些队列有大小限制,因此如果想在一个满的队列中加入一个新项,多出的项就会被拒绝。

这时新的 offer 方法就可以起作用了。

它不是对调用 add() 方法抛出一个 unchecked 异常,而只是得到由 offer() 返回的 false。

poll,remove区别: remove() 和 poll() 方法都是从队列中删除第一个元素(head)。

remove() 的行为与 Collection 接口的版本相似, 但是新的 poll() 方法在用空集合调用时不是抛出异常,只是返回 null。

因此新的方法更适合容易出现异常条件的情况。

peek,element区别: element() 和 peek() 用于在e69da5e887aa62616964757a686964616f31333332613666队列的头部查询元素。

与 remove() 方法类似,在队列为空时, element() 抛出一个异常,而 peek() 返回 null。

处理InterruptedException,捕捉到它,然后怎么处理它? 详细??

您不能忽略这个异常, 因为它是一个检查异常(checked excepti on) 。

但是应该如何处理它呢? 在本月的 Java 理论与实践中, 并发专家 Bri an Goetz 将解释 I nterruptedExcepti on 的含义, 为什么会抛出 I nterruptedExcepti on, 以及在捕捉到该异常时应该怎么做。

这样的情景您也许并不陌生: 您在编写一个测试程序, 程序需要暂停一段时间, 于是调用 Thread. sleep( ) 。

但是编译器或 I DE 报错说没有处理检查到的 InterruptedException 。

InterruptedException 是什么呢, 为什么必须处理它? 对于 InterruptedException , 一种常见的处理方式是 “生吞(swal l ow) ” 它 —— 捕捉它, 然后什么也不做(或者记录下它, 不过这也好不到哪去) —— 就像后面的 清单 4 一样。

不幸的是, 这种方法忽略了这样一个事实: 这期间可能发生中断, 而中断可能导致应用程序丧失及时取消活动或关闭的能力。

当一个方法抛出 InterruptedException 时, 它不仅告诉您它可以抛出一个特定的检查异常, 而且还告诉您其他一些事情。

例如, 它告诉您它是一个阻塞(blocking) 方法, 如果您响应得当的话, 它将尝试消除阻塞并尽早返回。

阻塞方法不同于一般的要运行较长时间的方法。

一般方法的完成只取决于它所要做的事情,以及是否有足够多可用的计算资源(CPU 周期和内存) 。

而阻塞方法的完成还取决于一些外部的事件, 例如计时器到期, I /O 完成, 或者另一个线程的动作(释放一个锁, 设置一个标志, 或者将一个任务放在一个工作队列中) 。

一般方法在它们的工作做完后即可结束,而阻塞方法较难于预测, 因为它们取决于外部事件。

阻塞方法可能影响响应能力, 因为难于预测它们何时会结束。

阻塞方法可能因为等不到所等的事件而无法终止, 因此令阻塞方法可取消 就非常有用(如果长时间运行的非阻塞方法是可取消的, 那么通常也非常有用) 。

可取消操作是指能从外部使之在正常完成之前终止的操作。

由 Thread 提供并受 Thread. sleep( ) 和 Obj ect. wait( ) 支持的中断机制就是一种取消机制; 它允许一个线程请求另一个线程停止它正在做的事情。

当一个方法抛出 InterruptedException 时, 它是在告诉您, 如果执行该方法的线程被中断, 它将尝试停止它正在做的事情而提前返回, 并通过抛出 InterruptedException 表明它提前返回。

行为良好的阻塞库方法应该能对中断作出响应并抛出 InterruptedException , 以便能够用于可取消活动中, 而不至于影响响应。

每个线程都有一个与之相关联的 Bool ean 属性, 用于表示线程的中断状态(interrupted status) 。

中断状态初始时为 fal se; 当另一个线程通过调用 Thread. interrupt( ) 中断一个线程时, 会出现以下两种情况之一。

如果那个线程在执行一个低级可中断阻塞方法, 例如 Thread. sleep( ) 、 Thread. j oin( ) 或 Obj ect. wait( ) , 那么它将取消阻塞并抛出 InterruptedException 。

否则, interrupt( ) 只是设置线程的中断状态。

在被中断线程中运行的代码以后可以轮询中断状态, 看看它是否被请求停止正在做的事情。

中断状态可以通过 Thread. isInterrupted( ) 来读取, 并且可以通过一个名为 Thread. interrupted( ) 的操作读取和清除。

中断是一种协作机制。

当一个线程中断另一个线程时, 被中断的线程不一定要立即停止正在做的事情。

相反, 中断是礼貌地请求另一个线程在它愿意并且方便的时候停止它正在做的事情。

有些方法, 例如 Thread. sleep( ) , 很认真地对待这样的请求, 但每个方法不是一定要对中断作出响应。

对于中断请求, 不阻塞但是仍然要花较长时间执行的方法可以轮询中断状态, 并在被中断的时候提前返回。

您可以随意忽略中断请求, 但是这样做的话会影响响应。

中断的协作特性所带来的一个好处是, 它为安全地构造可取消活动提供更大的灵活性。

我们很少希望一个活动立即停止; 如果活动在正在进行更新的时候被取消, 那么程序数据结构可能处于不一致状态。

中断允许一个可取消活动来清理正在进行的工作, 恢复不变量, 通知其他活动它要被取消, 然后才终止。

InterruptedExcepti on 如果抛出 InterruptedException 意味着一个方法是阻塞方法, 那么调用一个阻塞方法则意味着您的方法也是一个阻塞方法, 而且您应该有某种策略来处理 InterruptedException 。

通常最容易的策略是自己抛出 InterruptedException ,如清单 1 中 putTask( ) 和 getTask( ) 方法中的代码所示。

这样做可以使方法对中断作出响应, 并且只需将 InterruptedException 添加到 throws 子句。

1 . InterruptedExcepti on publ i c cl ass TaskQueue { pri vate stati c fi nal i nt MAX_TASKS = 1000; pri vate Bl ocki ngQueue queue = new Li nkedBl ocki ngQueue(MAX_TASKS) ; publ i c voi d putTask(Task r) { queue. put(r) ; } publ i c Task getTask() { return queue. take() ; } } 有时候需要在传播异常之前进行一些清理工作。

在这种情况下, 可以捕捉 InterruptedException , 执行清理, 然后抛出异常。

清单 2 演示了这种技术, 该代码是用于匹配在线游戏服务中的玩家的一种机制。

matchPlayers( ) 方法等待两个玩家到来, 然后开始一个新游戏。

如果在一个玩家已到来, 但是另一个玩家仍未到来之际该方法被中断, 那么它会将那个玩家放回队列中, 然后重新抛出 InterruptedException , 这样那个玩家对游戏的请求就不至于丢失。

2. InterruptedExcepti on publ i c cl ass Pl ayerMatcher { pri vate Pl ayerSource pl ayers; publ i c Pl ayerMatcher(Pl ayerSource pl ayers) { thi s. pl ayers = pl ayers; } publ i c voi d matchPl ayers() { try { Pl ayer pl ayerOne, pl ayerTwo; whi l e (true) { pl ayerOne = pl ayerTwo = nul l ; // Wai t for two pl ayers to arri ve and start a new game pl ayerOne = pl ayers. wai tForPl ayer() ; // coul d throw IE pl ayerTwo = pl ayers. wai tForPl ayer() ; // coul d throw IE startNewGame(pl ayerOne, pl ayerTwo) ; } } catch (InterruptedExcepti on e) { // If we got one pl ayer and were i nterrupted, put that pl ayer back i f (pl ayerOne ! = nul l ) pl ayers. addFi rst(pl ayerOne) ; // Then propagate the excepti on } } } 有时候抛出 InterruptedException 并不合适, 例如当由 Runnable 定义的任务调用一个可中断的方法时, 就是如此。

在这种情况下, 不能重新抛出 InterruptedException , 但是您也不想什么都不做。

当一个阻塞方法检测到中断并抛出 InterruptedException 时, 它清除中断状态。

如果捕捉到 InterruptedException 但是不能重新抛出它, 那么应该保留中断发生的证据, 以便调用栈中更高层的代码能知道中断, 并对中断作出响应。

该任务可以通过调用 interrupt( ) 以 “重新中断” 当前线程来完成, 如清单 3 所示。

至少, 每当捕捉到 InterruptedException 并且不重新抛出它时, 就在返回之前重新中断当前线程。

3. InterruptedExcepti on publ i c cl ass TaskRunner i mpl ements Runnabl e { pri vate Bl ocki ngQueue queue; publ i c TaskRunner(Bl ocki ngQueue queue) { thi s. queue = queue; } publ i c voi d run() { try { whi l e (true) { Task task = queue. take(10, Ti meUni t. SECONDS) ; task. execute() ; } } catch (InterruptedExcepti on e) { // Restore the i nterrupted status } } } 处理 InterruptedException 时采取的最糟糕的做法是生吞它 —— 捕捉它, 然后既不重新抛出它, 也不重新断言线程的中断状态。

对于不知如何处理的异常, 最标准的处理方法是捕捉它, 然后记录下它, 但是这种方法仍然无异于生吞中断, 因为调用栈中更高层的代码还是无法获得关于该异常的信息。

(仅仅记录 InterruptedException 也不是明智的做法, 因为等到人来读取日志的时候, 再来对它作出处理就为时已晚了。

) 清单 4 展示了一种使用得很广泛的模式, 这也是生吞中断的一种模式: 4. // Don' t do thi s publ i c cl ass TaskRunner i mpl ements Runnabl e { pri vate Bl ocki ngQueue queue; publ i c TaskRunner(Bl ocki ngQueue queue) { thi s. queue = queue; } publ i c voi d run() { try { whi l e (true) { Task task = queue. take(10, Ti meUni t. SECONDS) ; task. execute() ; } } catch (InterruptedExcepti on swal l owed) { /* DON' T DO THIS - RESTORE THE INTERRUPTED STATUS INSTEAD */ } } } 如果不能重新抛出 InterruptedException , 不管您是否计划处理中断请求, 仍然需要重新中断当前线程, 因为一个中断请求可能有多个 “接收者” 。

标准线程池( ThreadPoolExecutor ) worker 线程实现负责中断, 因此中断一个运行在线程池中的任务可以起到双重效果, 一是取消任务, 二是通知执行线程线程池正要关闭。

如果任务生吞中断请求, 则 worker 线程将不知道有一个被请求的中断, 从而耽误应用程序或服务的关闭。

语言规范中并没有为中断提供特定的语义, 但是在较大的程序中, 难于维护除取消外的任何中断语义。

取决于是什么活动, 用户可以通过一个 GUI 或通过网络机制, 例如 JMX 或 Web 服务来请求取消。

程序逻辑也可以请求取消。

例如, 一个 Web 爬行器(crawl er) 如果检测到磁盘已满, 它会自动关闭自己, 否则一个并行算法会启动多个线程来搜索解决方案空间的不同区域, 一旦其中一个线程找到一个解决方案, 就取消那些线程。

仅仅因为一个任务是可取消的, 并不意味着需要立即 对中断请求作出响应。

对于执行一个循环中的代码的任务, 通常只需为每一个循环迭代检查一次中断。

取决于循环执行的时间有多长, 任何代码可能要花一些时间才能注意到线程已经被中断(或者是通过调用 Thread. isInterrupted( ) 方法轮询中断状态, 或者是调用一个阻塞方法) 。

如果任务需要提高响应能力, 那么它可以更频繁地轮询中断状态。

阻塞方法通常在入口就立即轮询中断状态, 并且, 如果它被设置来改善响应能力, 那么还会抛出 InterruptedException 。

惟一可以生吞中断的时候是您知道线程正要退出。

只有当调用可中断方法的类是 Thread 的一部分, 而不是 Runnable 或通用库代码的情况下, 才会发生这样的场景, 清单 5 演示了这种情况。

清单 5 创建一个线程, 该线程列举素数, 直到被中断, 这里还允许该线程在被中断时退出。

用于搜索素数的循环在两个地方检查是否有中断: 一处是在 whi l e 循环的头部轮询 isInterrupted( ) 方法, 另一处是调用阻塞方法 BlockingQueue. put( ) 。

5. publ i c cl ass Pri meProducer extends Thread { pri vate fi nal Bl ocki ngQueue queue; Pri meProducer(Bl ocki ngQueue queue) { thi s. queue = queue; } publ i c voi d run() { try { Bi gInteger p = Bi gInteger. ONE; whi l e (! Thread. currentThread() . i sInterrupted() ) queue. put(p = p. nextProbabl ePri me() ) ; } catch (InterruptedExcepti on consumed) { /* Al l ow thread to exi t */ } } publ i c voi d cancel () { i nterrupt() ; } } 并非所有的阻塞方法都抛出 InterruptedException 。

输入和输出流类会阻塞等待 I /O 完成, 但是它们不抛出 InterruptedException , 而且在被中断的情况下也不会提前返回。

然而, 对于套接字 I /O, 如果一个线程关闭套接字, 则那个套接字上的阻塞 I /O 操作将提前结束, 并抛出一个 SocketException 。

j ava. nio 中的非阻塞 I /O 类也不支持可中断 I /O, 但是同样可以通过关闭通道或者请求 Selector 上的唤醒来取消阻塞操作。

类似地, 尝试获取一个内部锁的操作(进入一个 synchronized 块) 是不能被中断的, 但是 ReentrantLock 支持可中断的获取模式。

有些任务拒绝被中断, 这使得它们是不可取消的。

但是, 即使是不可取消的任务也应该尝试保留中断状态, 以防在不可取消的任务结束之后, 调用栈上更高层的代码需要对中断进行处理。

清单 6 展示了一个方法, 该方法等待一个阻塞队列, 直到队列中出现一个可用项目,而不管它是否被中断。

为了方便他人, 它在结束后在一个 fi nal l y 块中恢复中断状态, 以免剥夺中断请求的调用者的权利。

(它不能在更早的时候恢复中断状态, 因为那将导致无限循环 —— BlockingQueue. take( ) 将在入口处立即轮询中断状态, 并且, 如果发现中断状态集, 就会抛出 InterruptedException 。

) 6. publ i c Task getNextTask(Bl ocki ngQueue queue) { bool ean i nterrupted = fal se; try { whi l e (true) { try { return queue. take() ; } catch (InterruptedExcepti on e) { i nterrupted = true; // fal l through and retry } } } fi nal l y { i f (i nterrupted) Thread. currentThread() . i nterrupt() ; } } 您可以用 Java 平台提供的协作中断机制来构造灵活的取消策略。

各活动可以自行决定它们是可取消的还是不可取消的, 以及如何对中断作出响应, 如果立即返回会危害应用程序完整性的话, 它们还可以推迟中断。

BlockingQueue可用于什么场合

BlockingQueue是一个由数组支持的有界阻塞队列,也就是说当一个线程向一个固定大小的BlockingQueue队列里面不停地存放数据,另一个线程不停的向这个队列里面取数据,如果队列满了,还继续存放数据,此时出现阻塞,直到队列有空闲的位置;反之,如果队列为空,还继续取数据,则会出现阻塞,直到队列中有数据为止

如何使用Disruptor从Ringbuffer读取

它可以用来替代队列,同时有很多SEDA和Actors模式的特性。

和队列比较:Disruptor可以向其他线程发送消息,并在需要的时候唤醒其他线程(和BlockingQueue相似)。

不过,他们之间有三个主要的区别。

2. 把消息放入Disruptor需要2个步骤

VirMach(8元/月)KVM VPS,北美、欧洲

VirMach,成立于2014年的美国IDC商家,知名的低价便宜VPS销售商,支持支付宝、微信、PayPal等方式付款购买,主打美国、欧洲暑假中心产品,拥有包括洛杉矶、西雅图、圣何塞、凤凰城在内的11个数据中心可以选择,可以自由搭配1Gbps、2Gbps、10Gbps带宽端口,有Voxility DDoS高防IP可以选择(500Gbps以上的防御能力),并且支持在控制面板付费切换机房和更换IP(带...

#消息# contabo:德国老牌机房新增美国“纽约、西雅图”数据中心,免设置费

运作了18年的德国老牌机房contabo在继去年4月开办了第一个美国数据中心(中部城市:圣路易斯)后立马在本月全新上马两个数据中心:纽约、西雅图。当前,为庆祝美国独立日,美国三个数据中心的VPS全部免除设置费,VPS本身的配置很高,价格适中,有较高的性价比!官方网站:https://contabo.com/en/SSD VPSKVM虚拟,纯SSD阵列,不限制流量,自带一个IPv4内存CPUSSD带...

Digital-VM暑期全场六折优惠,8个机房

Digital-VM商家目前也在凑热闹的发布六月份的活动,他们家的机房蛮多的有提供8个数据中心,包括日本、洛杉矶、新加坡等。这次六月份的促销活动全场VPS主机六折优惠。Digital-VM商家还是有一点点特点的,有提供1Gbps和10Gbps带宽的VPS主机,如果有需要大带宽的VPS主机可以看看。第一、商家优惠码优惠码:June40全场主机六折优惠,不过仅可以月付、季付。第二、商家VPS主机套餐1...

blockingqueue为你推荐
pexelszatchels中文是什么意思云图片云相册是什么意思丁奇赛尔号丁奇技能表,带等级,刷什么学习力好?flowplayerswfobject.js这能不能播放音频啊(amr)memsql易语言的msql连接怎么不成功,错哪呢?新手怎么制作表格如何学会制作表格?菜霸现实中遇到地痞流氓该怎么办sungard李庆星老板咋样 我准备去CDMC上海决策者经济顾问公司上班了网游加速小助手QQ网游加速小助手怎么用网络管理员教程网络管理从零基础开始学习,要怎么学啊
中国万网域名 smartvps 瓦工 awardspace hawkhost kdata godaddy优惠券 info域名 云主机51web 建站代码 京东商城0元抢购 godaddy域名证书 softbank邮箱 ntfs格式分区 什么是服务器托管 免费智能解析 网通服务器托管 ftp免费空间 metalink 无限流量 更多