百万数据处理导致服务器瘫痪,更改为多线程处理
创始人
2025-01-17 18:33:32
0

多线程导入百万数据

一、如何开启多线程
1.继承Thread类
public class ThreadTask extends Thread{     @Override     public void run() {         System.out.println("成功运行");     } } ​ public class ThreadDemo {     public static void main(String[] args) {         ThreadTask thread = new ThreadTask();         ThreadTask thread2 = new ThreadTask();         thread.start();         thread2.start();     } }
2.实现Runnable接口
public class RunnableTask implements Runnable{     @Override     public void run() {         System.out.println("线程开启成功");     } } ​ public class RunnableDemo {     public static void main(String[] args) {         RunnableTask task = new RunnableTask();         new Thread(task).start();         new Thread(task).start();     } }
3.实现Callable接口
public class CallableTask implements Callable {     @Override     public String call() throws Exception {         String s = "线程开启成功";         return s;     } } ​ public class CallableDemo {     public static void main(String[] args) throws ExecutionException, InterruptedException {         CallableTask task = new CallableTask();         FutureTask futureTask = new FutureTask<>(task);         new Thread(futureTask).start();         String string = futureTask.get();         System.out.println(string);     } }
4.使用线程池
public class demo {     public static void main(String[] args) {         ExecutorService executor = Executors.newFixedThreadPool(5);         //简化写法         executor.execute(() -> System.out.println("Hello World"));     } } ​ public class demo {     public static void main(String[] args) {         ExecutorService executor = Executors.newFixedThreadPool(5);         //正常写法         executor.execute(new Runnable() {             @Override             public void run() {                 System.out.println("线程开启成功");             }         });     } }

阿里巴巴开发手册中写到不允许使用Executors开启线程池。

二、线程池介绍
1.Executor的UML图

executor包括Executor,Executors,ExecutorService,AbstractExecutorService,ScheduledExecutorService,ThreadPoolExecutor,ScheduledThreadPoolExecutor等。

2.Executor

Executor:一个接口,定义了一个接收Runnable对象的方法executor,里面有一个方法void execute(Runnable command);接收一个Runnable实例,用来执行一个任务。

3.ExecutorService

ExecutorService:继承了Executor接口,提供了生命周期管理的方法,任务提交返回Future对象;提供了可以关闭ExecutorService的方法void shutdown();调用该方法后,将导致ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。因此我们一般用该接口来实现和管理多线程。

4.Executors类

Executors:提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。

1.public static ExecutorService newFixedThreadPool(int nThreads) 创建固定数目线程的线程池。

2.public static ExecutorService newCachedThreadPool() 创建一个可缓存的线程池,可以根据需要自动扩展,如果有可用的空闲线程,就会重用他们,如果没有可用的线程,就会创建一个新线程。

3.public static ExecutorService newSingleThreadExecutor() 创建一个单线程的线程池,串行执行任务。

4.public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 创建一个固定大小的线程池,用于执行定时任务。

5.public static ScheduledExecutorService newSingleThreadScheduledExecutor() 创建一个单线程的定时执行线程池。只包含一个线程,用于串行定时执行任务。

6.public static ExecutorService newWorkStealingPool(int parallelism) 创建一个工作窃取线程池,根据cpu核心数动态调整。

5.自定义线程池

线程池七个参数:

1.corePoolSize:核心线程数。

2.maximumPoolSize:最大线程数。

3,keepAliveTime:持续时间。

4.unit:时间的单位。

5.workQueue:任务队列。

6.ThreadFactory:线程的创建工厂。

7.RejectedExecutionHandler :拒绝策略。

6.拒绝策略

所有的拒绝策略都是实现了RejectedExecutionHandler接口。

常见的拒绝策略

1.AbortPolicy:抛出异常,线程池默认的拒绝策略。

2.CallerRunsPolicy:使用调用线程执行任务。

3.DiscardPolicy:直接丢弃。

4.DiscardOldestPolicy:丢弃队列最老的任务,添加新任务。

7.阻塞队列

所有的阻塞队列都是继承了BlockingQueue接口。

选择阻塞队列时根据需要选择合适的阻塞队列

1.ArrayBlockingQueue:一个有数组结构组成的有界阻塞队列。

2.LinkedBlockingQueue:由链表组成的有界(大小默认是integer.MAX_VALUE)阻塞队列。

3.DelayQueue:使用优先级队列实现的延迟无界阻塞队列。

4.PriorityBlockingQueue:支持优先级排序的无界阻塞队列。

三、线程状态

查看源码发现线程一共有六中状态,分别是:

1.NEW:新建状态,新创建了一个线程对象,但还没有调用Start()方法。

2.RUNNABLE:运行状态,调用了start()方法,在获得CPU时间片之后变为运行状态RUNNING。

3.BLOCKED:阻塞状态。

4.WAITING:等待状态,调用了wait,join等方法,会进入等待状态。

5.TIMED_WAITING:定时等待状态,具有指定等待时间的等待线程的状态。

6.TERMINATED:终止状态,终止线程的状态,线程已经执行完成。

四、具体代码实现
业务介绍:

需要把一个100万数据的集合进行处理,插入到另一个新集合中,使用多线程。

代码实现:
public class ExecutorsDemo {     public static void main(String[] args) throws InterruptedException {         //定义一个线程池         ThreadPoolExecutor executor = new ThreadPoolExecutor(                 10,                 50,                 60,                 TimeUnit.SECONDS,                 new ArrayBlockingQueue<>(20),                 Executors.defaultThreadFactory(),                 new ThreadPoolExecutor.AbortPolicy()         );         List list = new ArrayList<>();         for (int i = 0; i < 1000000; i++) {             list.add("这是第" + i + "条");         }         long begin = System.currentTimeMillis();         System.out.println(begin);         List newList = new CopyOnWriteArrayList<>();         //计算总共多少条数据         int countNumber = list.size();         //分页页码:每次读取数据2000条         int singleThreadDealCount = 2000;         //分页页数:按照每次存储2000条数据,一共需要存储多少次,四舍五入的时候,小数后面不足5也得进一位         int threadSize = (int) Math.ceil(countNumber / singleThreadDealCount);         //定义一个计数器         CountDownLatch count = new CountDownLatch(threadSize);         Thread.State state = Thread.State.NEW;         for (int i = 0; i < threadSize; i++) {             if (executor.getQueue().size() == 20) {                 Thread.sleep(1000);             }             int finalI = i;             executor.execute(() -> {                 //声明每次从list集合截取部分的开始索引和结束索引                 int startIndex = finalI * singleThreadDealCount;                 //最后一个线程可能处理不到结尾,因为结尾索引需要设置为集合的最后一条数据                 int endIndex = (finalI == threadSize - 1) ? countNumber : (finalI + 1) * singleThreadDealCount;                 //截取集合                 List strings = list.subList(startIndex, endIndex);                 newList.addAll(strings);                 count.countDown();             });         }         count.await();         long end = System.currentTimeMillis();         System.out.println(newList.size());         System.out.println(end - begin); ​ ​     }
注意事项:

在这里使用的接收的集合一定要是线程安全的集合,使用ArrayList会有线程不安全的问题,会导致索引越界异常。这只是一个小demo,具体的代码需要结合自己的业务需求实现。

相关内容

热门资讯

黑科技好友“wepoke有没有... 黑科技好友“wepoke有没有挂”wepoke有没有挂(透明挂黑科技)本来真的是有挂(果真有挂)-哔...
总算了解!微扑克辅助测试,微扑... 总算了解!微扑克辅助测试,微扑克透牌(微扑克)真是是有挂(有挂详情)-哔哩哔哩1、进入游戏-大厅左侧...
玩家必看教程(德州ai辅助)德... 玩家必看教程(德州ai辅助)德州之星辅助器多少钱(起初是真的有挂)-哔哩哔哩;科技详细教程威信《13...
揭秘!微扑克有机器人吗,微扑克... 揭秘!微扑克有机器人吗,微扑克如何让系统发好牌(微扑克)切实是真的有挂(证实有挂)-哔哩哔哩微扑克如...
重要通知“wpk ai辅助”w... 1、重要通知“wpk ai辅助”wpk ai辅助(WPk)原先是有挂(讲解有挂)-哔哩哔哩2、进入游...
黑科技有挂“wepoke黑科技... 黑科技有挂“wepoke黑科技”wepokeai代打逻辑(透明挂黑科技)竟然真的是有挂(有挂技术)-...
记者揭秘!微扑克辅助神器,微扑... 记者揭秘!微扑克辅助神器,微扑克如何让系统发好牌(微扑克)竟然存在有挂(讲解有挂)-哔哩哔哩1、进入...
避坑细节(线上wpk德州ai机... 【福星临门,好运相随】;避坑细节(线上wpk德州ai机器人)德州ai人工智能(本然是有挂)-哔哩哔哩...
一分钟了解!“wpk到底有没有... 一分钟了解!“wpk到底有没有外挂”wpkai辅助怎么赢牌(wpK)起初真的有挂(有人有挂)-哔哩哔...
黑科技攻略“wepoke透明黑... 黑科技攻略“wepoke透明黑科技”wepoke苹果下载网址(透明挂)一直存在有挂(证实有挂)-哔哩...