- 浏览: 15981 次
- 性别:
- 来自: 杭州
最新评论
-
richard_lee:
smallbug_vip 写道spiniper 写 ...
java异步计算场景应用 -
smallbug_vip:
spiniper 写道我只想问一句楼主,你使用的多线程处理和你 ...
java异步计算场景应用 -
spiniper:
我只想问一句楼主,你使用的多线程处理和你单线程的处理真的有解决 ...
java异步计算场景应用
最近项目中遇到一个业务场景:
将当期数据库中的表迁移到另外一个数据库中,为满足迁移效率需要进行并发数据迁移。对每一数据表可以启动不同的线程同时迁移数据。迁移完成后,同步更新对应该迁移任务的状态字段。
最先想到的是使用java中并发工具类:同步屏障CyclicBarrier。
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。
一、通过CyclicBarrier实现迁移任务代码:
业务处理代码:
测试入口:
通过上述实现步骤,完全可以实现业务场景。
增强业务场景:在上述场景基础上,对每次迁移的结果进行最终的汇总。多少迁移成功,多少迁移失败。也就是对每个线程处理结果进行汇总。
这个就涉及到线程间通信的问题。在现有处理的基础上,添加一个公共List变量,在迁移VerticaTransferTask run()方法中将迁移结果synchronized放在List
中即可。
但是,有没有更好的实现方式呢?
Future接口
描述:从Java 1.5开始,提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。
这就表示我们可以通过Future获取每个线程的执行结果。我以下通过并行计算产品利润的方式简单实现需求。
二、通过Future实现并行处理任务代码:
业务处理代码:
很简单我们就实现了并行计算并合并结果集。
那我能不能两者一起使用呢,我在CyclicBarrier处理结果DoAfter类中获取Future结果进行统计。
这样不就可以满足需求了吗。设想处理如下:
业务处理VerticaTransferTask:
运行后发现死锁啦,原因是什么呢?
查了一下CyclicBarrier资料,注意这一点:
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrier-Action),用于在线程到达屏障时,优先执行barrierAction。
也就是在barrier.await()执行之后会优先执行DoAfter类中的run, 而这时run中的 future.get()阻塞等待VerticaTransferTask call运行结果,形成了资源相互
抢占,造成了死锁。
这样我们就大概了解了在java中有两种实现并行计算的方式,那么具体遇到问题的时候如何选择呢?
我们还是要清楚两者的概念:
CyclicBarrier在到达屏障之后线程并没有处理结束,而是被阻塞等待,等有优先处理barrierAction完成后,被signalAll唤醒继续运行。
CyclicBarrier中的源代码:
而Future是等待线程运行完成之后才获取结果,否则一直阻塞等待。
附该Demo代码
现在计算机早已进入了多核时代,一颗CPU照样可以并行运行任务。再者VerticaTransferTask类中复写的call方法中有一句Thread.sleep(1000);线程间状态切换所消耗的时间片绝对远远小于1000毫秒。所以即使是单核CPU开启多线程照样可以节省时间开销。虽然我并不知道为什么来一句Thread.sleep(1000),难道是模拟业务带来的时间开销,如果真实业务在这1000毫秒内线程不存在线程阻塞,一直在努力工作,那么单核CPU是不应该在多线程方面考虑解决方案了。
关于并发这块你说的对。即使是单核处理器也支持多线程执行代码,CPU通过给每个线程分配CPU时间片来实现这个机制。时间片是CPU分配给各个线程的时间,因为时间片非常短,所以CPU通过不停地切换线程执行,让我们感觉多个线程是同时执行的,时间片一般是几十毫秒(ms)。到多核时代,并发编程就更显示他的优势。更不用说对现在互联网时代高并发的业务支撑了。
至于Thread.sleep(1000);这块是我加的模拟业务时间(当然实际时间不止1s)
现在计算机早已进入了多核时代,一颗CPU照样可以并行运行任务。再者VerticaTransferTask类中复写的call方法中有一句Thread.sleep(1000);线程间状态切换所消耗的时间片绝对远远小于1000毫秒。所以即使是单核CPU开启多线程照样可以节省时间开销。虽然我并不知道为什么来一句Thread.sleep(1000),难道是模拟业务带来的时间开销,如果真实业务在这1000毫秒内线程不存在线程阻塞,一直在努力工作,那么单核CPU是不应该在多线程方面考虑解决方案了。
将当期数据库中的表迁移到另外一个数据库中,为满足迁移效率需要进行并发数据迁移。对每一数据表可以启动不同的线程同时迁移数据。迁移完成后,同步更新对应该迁移任务的状态字段。
最先想到的是使用java中并发工具类:同步屏障CyclicBarrier。
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。
一、通过CyclicBarrier实现迁移任务代码:
package com.future.test; import java.util.List; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class VerticaTransfer extends DataTransfer<DataInfo>{ int threadCount = 10; //线程调度 ExecutorService executor = null; CyclicBarrier barrier; //计算结果集 protected void doBefore(DataInfo entity){ //线程池 executor = Executors.newFixedThreadPool(threadCount); //CyclicBarrier可以用于多线程计算数据,最后处理结果的场景 barrier = new CyclicBarrier(threadCount,new DoAfter(this,entity)); } protected void doJob(DataInfo entity){ //并行计算 List<Product> ps = entity.getProducts(); for (Product product : ps) { executor.execute(new VerticaTransferTask(barrier,product)); } } @Override protected void doAfter(DataInfo entity) { } } /** * 合并计算处理 * @author Administrator * */ class DoAfter implements Runnable { private VerticaTransfer verticaTransfer; private DataInfo entity; DoAfter(VerticaTransfer verticaTransfer,DataInfo entity) { this.verticaTransfer = verticaTransfer; this.entity = entity; } public void run() { System.out.println("迁移完成。共迁移:" + entity.getProducts().size()); } }
业务处理代码:
package com.future.test; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * 数据迁移执行任务 * @author * */ public class VerticaTransferTask implements Runnable{ private CyclicBarrier barrier; private Product product; VerticaTransferTask(Product product){ this.product = product; } VerticaTransferTask(CyclicBarrier barrier,Product product){ this.barrier = barrier; this.product = product; } @Override public void run() { // TODO Auto-generated method stub try { System.out.println("进行迁移 :" + product.getId()); Thread.sleep(1000); }catch(Exception e){ e.printStackTrace(); } finally { try { barrier.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
测试入口:
package com.future.test; import java.util.ArrayList; import java.util.List; public class VerticaTransferTest{ public static void main(String[] args) { VerticaTransfer transfer = new VerticaTransfer(); // DataInfo data = new DataInfo(); List<Product> ps = new ArrayList<Product>(); int tmp = 0; for(int i = 0; i < 10;i++){ Product p = new Product(); p.setId(i + ""); p.setPurchase_price(10); p.setSalse_price(10 + i); ps.add(p); tmp += i; } data.setProducts(ps); transfer.execute(data); } }
通过上述实现步骤,完全可以实现业务场景。
增强业务场景:在上述场景基础上,对每次迁移的结果进行最终的汇总。多少迁移成功,多少迁移失败。也就是对每个线程处理结果进行汇总。
这个就涉及到线程间通信的问题。在现有处理的基础上,添加一个公共List变量,在迁移VerticaTransferTask run()方法中将迁移结果synchronized放在List
中即可。
但是,有没有更好的实现方式呢?
Future接口
描述:从Java 1.5开始,提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。
这就表示我们可以通过Future获取每个线程的执行结果。我以下通过并行计算产品利润的方式简单实现需求。
二、通过Future实现并行处理任务代码:
package com.test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class VerticaTransfer extends DataTransfer<DataInfo>{ int threadCount = 10; //线程调度 ExecutorService executor = null; //计算结果集 List<Future<ResultInfo>> results = new ArrayList<Future<ResultInfo>>(); protected void doBefore(DataInfo entity){ //线程池 executor = Executors.newFixedThreadPool(threadCount); } protected void doJob(DataInfo entity){ //并行计算 List<Product> ps = entity.getProducts(); for (Product product : ps) { Future<ResultInfo> res = executor.submit(new VerticaTransferTask(product)); results.add(res); } } @Override protected void doAfter(DataInfo entity) { double total = 0; List<Future<ResultInfo>> rs = this.results; for (Future<ResultInfo> future : rs) { try { ResultInfo info = future.get(); total += info.getPrice(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println("产品总利润:" + total); } }
业务处理代码:
package com.test; import java.util.concurrent.Callable; /** * 数据迁移执行任务 * @author * */ public class VerticaTransferTask implements Callable<ResultInfo>{ private Product product; VerticaTransferTask(Product product){ this.product = product; } @Override public ResultInfo call() throws Exception { // TODO Auto-generated method stub ResultInfo res = null; try { double money = product.getSalse_price() - product.getPurchase_price(); res = new ResultInfo(); res.setPrice(money); res.setProductId(product.getId()); Thread.sleep(1000); }catch(Exception e){ e.printStackTrace(); } return res; } }
很简单我们就实现了并行计算并合并结果集。
那我能不能两者一起使用呢,我在CyclicBarrier处理结果DoAfter类中获取Future结果进行统计。
这样不就可以满足需求了吗。设想处理如下:
public class VerticaTransfer extends DataTransfer<DataInfo>{ int threadCount = 10; //线程调度 ExecutorService executor = null; CyclicBarrier barrier; //计算结果集 List<Future<ResultInfo>> results = new ArrayList<Future<ResultInfo>>(); protected void doBefore(DataInfo entity){ //线程池 executor = Executors.newFixedThreadPool(threadCount); //CyclicBarrier可以用于多线程计算数据,最后处理结果的场景 barrier = new CyclicBarrier(threadCount,new DoAfter(this,entity)); } protected void doJob(DataInfo entity){ //并行计算 List<Product> ps = entity.getProducts(); for (Product product : ps) { Future<ResultInfo> res = executor.submit(new VerticaTransferTask(product)); results.add(res); } } @Override protected void doAfter(DataInfo entity) { } } /** * 合并计算处理 * @author Administrator * */ class DoAfter implements Runnable { private VerticaTransfer verticaTransfer; private DataInfo entity; DoAfter(VerticaTransfer verticaTransfer,DataInfo entity) { this.verticaTransfer = verticaTransfer; this.entity = entity; } public void run() { double total = 0; List<Future<ResultInfo>> rs = verticaTransfer.results; for (Future<ResultInfo> future : rs) { try { ResultInfo info = future.get(); total += info.getPrice(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println("产品总利润:" + total); } }
业务处理VerticaTransferTask:
public class VerticaTransferTask implements Callable<ResultInfo>{ private CyclicBarrier barrier; private Product product; VerticaTransferTask(Product product){ this.product = product; } VerticaTransferTask(CyclicBarrier barrier,Product product){ this.barrier = barrier; this.product = product; } @Override public ResultInfo call() { // TODO Auto-generated method stub ResultInfo res = null; try { double money = product.getSalse_price() - product.getPurchase_price(); res = new ResultInfo(); res.setPrice(money); res.setProductId(product.getId()); Thread.sleep(1000); }catch(Exception e){ e.printStackTrace(); } finally { try { barrier.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return res; } }
运行后发现死锁啦,原因是什么呢?
查了一下CyclicBarrier资料,注意这一点:
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrier-Action),用于在线程到达屏障时,优先执行barrierAction。
也就是在barrier.await()执行之后会优先执行DoAfter类中的run, 而这时run中的 future.get()阻塞等待VerticaTransferTask call运行结果,形成了资源相互
抢占,造成了死锁。
这样我们就大概了解了在java中有两种实现并行计算的方式,那么具体遇到问题的时候如何选择呢?
我们还是要清楚两者的概念:
CyclicBarrier在到达屏障之后线程并没有处理结束,而是被阻塞等待,等有优先处理barrierAction完成后,被signalAll唤醒继续运行。
CyclicBarrier中的源代码:
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
而Future是等待线程运行完成之后才获取结果,否则一直阻塞等待。
附该Demo代码
- Demo.zip (3.5 KB)
- 下载次数: 24
评论
3 楼
richard_lee
2016-02-17
smallbug_vip 写道
spiniper 写道
我只想问一句楼主,你使用的多线程处理和你单线程的处理真的有解决性能问题么?你的cpu只有一个,程序占用的cpu时间片段无论是单线程还是多线程应该都是一样,数据需要的内存空间也不会因为减少。单线程没有额外的线程资源和内存开销,不需要锁来维护共享数据的脏读问题,不需要仔细设计线程之间的协同问题只要保证逻辑的争取即结果正确,没有线程之间通信的额外开销和一些因为多线程而多出来的额外逻辑处理模块。
这就好像程序开发管理理论上的说明,多人处理一个问题真的会比一个人处理更快么?更何况在成本有限的情况下。
所以一般遇到性能问题,不要首先想到多线程来处理,这是不合理的思维,而是要以逻辑本身是否存在步骤或者算法上的优化为基础,因为事实证明,大多数性能问题都是业务设计的问题和不合适的算法造成的。
这就好像程序开发管理理论上的说明,多人处理一个问题真的会比一个人处理更快么?更何况在成本有限的情况下。
所以一般遇到性能问题,不要首先想到多线程来处理,这是不合理的思维,而是要以逻辑本身是否存在步骤或者算法上的优化为基础,因为事实证明,大多数性能问题都是业务设计的问题和不合适的算法造成的。
现在计算机早已进入了多核时代,一颗CPU照样可以并行运行任务。再者VerticaTransferTask类中复写的call方法中有一句Thread.sleep(1000);线程间状态切换所消耗的时间片绝对远远小于1000毫秒。所以即使是单核CPU开启多线程照样可以节省时间开销。虽然我并不知道为什么来一句Thread.sleep(1000),难道是模拟业务带来的时间开销,如果真实业务在这1000毫秒内线程不存在线程阻塞,一直在努力工作,那么单核CPU是不应该在多线程方面考虑解决方案了。
关于并发这块你说的对。即使是单核处理器也支持多线程执行代码,CPU通过给每个线程分配CPU时间片来实现这个机制。时间片是CPU分配给各个线程的时间,因为时间片非常短,所以CPU通过不停地切换线程执行,让我们感觉多个线程是同时执行的,时间片一般是几十毫秒(ms)。到多核时代,并发编程就更显示他的优势。更不用说对现在互联网时代高并发的业务支撑了。
至于Thread.sleep(1000);这块是我加的模拟业务时间(当然实际时间不止1s)
2 楼
smallbug_vip
2016-02-17
spiniper 写道
我只想问一句楼主,你使用的多线程处理和你单线程的处理真的有解决性能问题么?你的cpu只有一个,程序占用的cpu时间片段无论是单线程还是多线程应该都是一样,数据需要的内存空间也不会因为减少。单线程没有额外的线程资源和内存开销,不需要锁来维护共享数据的脏读问题,不需要仔细设计线程之间的协同问题只要保证逻辑的争取即结果正确,没有线程之间通信的额外开销和一些因为多线程而多出来的额外逻辑处理模块。
这就好像程序开发管理理论上的说明,多人处理一个问题真的会比一个人处理更快么?更何况在成本有限的情况下。
所以一般遇到性能问题,不要首先想到多线程来处理,这是不合理的思维,而是要以逻辑本身是否存在步骤或者算法上的优化为基础,因为事实证明,大多数性能问题都是业务设计的问题和不合适的算法造成的。
这就好像程序开发管理理论上的说明,多人处理一个问题真的会比一个人处理更快么?更何况在成本有限的情况下。
所以一般遇到性能问题,不要首先想到多线程来处理,这是不合理的思维,而是要以逻辑本身是否存在步骤或者算法上的优化为基础,因为事实证明,大多数性能问题都是业务设计的问题和不合适的算法造成的。
现在计算机早已进入了多核时代,一颗CPU照样可以并行运行任务。再者VerticaTransferTask类中复写的call方法中有一句Thread.sleep(1000);线程间状态切换所消耗的时间片绝对远远小于1000毫秒。所以即使是单核CPU开启多线程照样可以节省时间开销。虽然我并不知道为什么来一句Thread.sleep(1000),难道是模拟业务带来的时间开销,如果真实业务在这1000毫秒内线程不存在线程阻塞,一直在努力工作,那么单核CPU是不应该在多线程方面考虑解决方案了。
1 楼
spiniper
2016-02-16
我只想问一句楼主,你使用的多线程处理和你单线程的处理真的有解决性能问题么?你的cpu只有一个,程序占用的cpu时间片段无论是单线程还是多线程应该都是一样,数据需要的内存空间也不会因为减少。单线程没有额外的线程资源和内存开销,不需要锁来维护共享数据的脏读问题,不需要仔细设计线程之间的协同问题只要保证逻辑的争取即结果正确,没有线程之间通信的额外开销和一些因为多线程而多出来的额外逻辑处理模块。
这就好像程序开发管理理论上的说明,多人处理一个问题真的会比一个人处理更快么?更何况在成本有限的情况下。
所以一般遇到性能问题,不要首先想到多线程来处理,这是不合理的思维,而是要以逻辑本身是否存在步骤或者算法上的优化为基础,因为事实证明,大多数性能问题都是业务设计的问题和不合适的算法造成的。
这就好像程序开发管理理论上的说明,多人处理一个问题真的会比一个人处理更快么?更何况在成本有限的情况下。
所以一般遇到性能问题,不要首先想到多线程来处理,这是不合理的思维,而是要以逻辑本身是否存在步骤或者算法上的优化为基础,因为事实证明,大多数性能问题都是业务设计的问题和不合适的算法造成的。
发表评论
-
java并发计算场景应用
2016-02-03 09:35 11最近项目中遇到一个业务场景: 将当期数据库中的表迁移到另外一个 ... -
两个线程交替打印数字-Condition唤醒与等待
2015-10-18 19:24 4073看到一道关于多线程题:如何实现两个线程交替打印数字0-9? ... -
并发锁及java.util.concurrent包浅析
2015-02-28 19:21 1007我们在处理并发操作的时候经常使用锁机制,大家常用的sy ... -
由一个CUP占用率过高的问题去理解Java垃圾回收机制
2014-12-25 22:29 1703最近我们的高并发平台遇到用户高峰运行时总会出现CPU占用率过高 ... -
Java NIO实战
2014-12-24 22:17 955最近参与实现底 ...
相关推荐
java 异步http和websocket客户端 java 异步http和websocket客户端 java 异步http和websocket客户端 java 异步http和websocket客户端 java 异步http和websocket客户端 java 异步http和websocket客户端 java 异步http...
java文件异步上传
java异步通信例子 java NIO 异步socket
java异步技术原理和实践基础分析。
JAVA 7 提供异步通信的很多类,可以编写高性能服务器程序,防止阻塞,节约CPU资源,以上就是NIO2教程,比NIO更上一个档次!
AsyncClientHttpExchangeStreaming类是基于HttpAsyncClients的异步请求代码
Vert.x最大的特点就在于异步(底层基于Netty),通过事件循环(EventLoop)来调起存储在异步任务队列(CallBackQueue)中的任务,大大降低了传统阻塞模型中线程对于操作系统的开销。因此相比较传统的阻塞模型,异步...
一.能适应不同类型的请求: 二.能同时并发处理多个请求,并能按一定机制调度: 用一个队列来存放请求,所以只能按FIFO机制调度,你可以改用LinkedList,就可以简单实现一个优先级(优先级高的addFirst,低的addLast). ...
Java_异步消息处理
java实现多文件异步上传
Java http异步加载网络图片,没有第三方包,都是Java原始类
异步计算Java 中异步计算与 Future 的演示
java写的异步文件发送程序,由于win7带外数据的发送存在问题无法作连通性测试,因此本程序只做了简单的isReachable()连通性测试。程序采用了文件长度->文件名字->文件内容的顺序发送方式,没有对非文件进行甄别.
MySQL和PostgreSQL的Java异步数据库驱动程序,用Kotlin编写
这个是一款 Unity3D异步加载场景资源 ,界面简洁漂亮的加载画面,内含多个样式,提供给广大的Unity3D学者研究。
java 异步server 可多client
避免了跳转场景时界面卡住不动的尴尬,使用时只需把scene0放入游戏的第一个场景即可,跳转场景时直接调用ChangeSceneManager.Instance.ChangeScene()方法即可进入动态加载场景界面,无需再创建跳转场景的UI。...
Java异步编程框架Promise的介绍,对于理解java异步编程有哦帮助。
Java多线程实现异步调用实例。运行Main可以看到结果。main是主线程,另有A,B,C三个线程用不同的时间跑完。
异步编排.java