多线程并发 多线程 - Java教程 - 廖雪峰的官方网站
在批量插入假用户的时候需要插入1000条数据,通过for循环线性插入时间效率太低了。
所以引入多线程来实现并发。
1 Service.saveBatch(userList,10000);
在插入数据库时可以用service自带的saveBatch来实现
在创建新数据并导入时也可以用多线程去创建。
实现方法分n组去创建将每组创建得到的userList去保存到数据库。n组数据并发执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 StopWatch stopWatch = new StopWatch (); final Long MAX_NUMBER = 100000L ; stopWatch.start(); List<CompletableFuture<Void>> futureList = new ArrayList <>(); for (int i=0 ;i<10 ;i++) { List<User> userList = Collections.synchronizedList(new ArrayList <>()); CompletableFuture<Void> future = CompletableFuture.runAsync(()-> { for (int j = 0 ; j < MAX_NUMBER; j++) { User user = new User (); user.setUsername("zh" + j); user.setUserAccount(Long.toString(MyCommonUtils.randNumber(10 ))); user.setAvatarUrl("../../src/img/avatar/mari.jpg" ); user.setGender((MyCommonUtils.randNumber(2 ) % 2 == 0 ? 1 : 0 )); user.setUserPassword("123456789" ); user.setPhone(Long.toString(MyCommonUtils.randNumber(10 ))); user.setEmail(Long.toString(MyCommonUtils.randNumber(10 )) + "@qq.com" ); user.setUserStatus(0 ); user.setIsDelete(0 ); user.setUserRole(0 ); user.setUserCode("123456789" ); user.setTags("[]" ); user.setProfile("这个人很懒" ); userList.add(user); } userService.saveBatch(userList,10000 ); } ); futureList.add(future); } CompletableFuture.allOf(futureList.toArray(new CompletableFuture []{})).join(); stopWatch.stop(); System.out.println(stopWatch.getTotalTimeMillis());
优化-> 既然能分成n组那么就可以分成n+1,n+2…n+x组所以可以利用二分的思想将其分到你想要的大小进行分块多线程并发。
用到了forkjoin
1 2 3 4 5 6 StopWatch stopWatch1 = new StopWatch ();stopWatch1.start(); ForkJoinTask<Integer> task = new AddUser (1 ,100000 ); ForkJoinPool.commonPool().invoke(task); stopWatch1.stop(); System.out.println(stopWatch1.getTotalTimeMillis());
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 package com.example.demo.utils;import com.example.demo.model.domain.User;import com.example.demo.service.UserService;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.Resource;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.RecursiveTask;@Component public class AddUser extends RecursiveTask <Integer> { @Resource private UserService userService; public static AddUser addUser; @PostConstruct public void init () { addUser = this ; } static final int THRESHOLD = 500 ; int start; int end; public AddUser () { } public AddUser (int start, int end) { this .start = start; this .end = end; } @Override protected Integer compute () { if (start>end) return 0 ; if (end - start <= THRESHOLD) { List<User>userList = Collections.synchronizedList(new ArrayList <>()); for (int i=0 ;i<=end-start;i++){ User user = new User (); user.setUsername("zh" + start+i); user.setUserAccount(Long.toString(MyCommonUtils.randNumber(10 ))); user.setAvatarUrl("../../src/img/avatar/mari.jpg" ); user.setGender((MyCommonUtils.randNumber(2 )%2 ==0 ?1 :0 )); user.setUserPassword("123456789" ); user.setPhone(Long.toString(MyCommonUtils.randNumber(10 ))); user.setEmail(Long.toString(MyCommonUtils.randNumber(10 ))+"@qq.com" ); user.setUserStatus(0 ); user.setIsDelete(0 ); user.setUserRole(0 ); user.setUserCode("123456789" ); user.setTags("[]" ); user.setProfile("这个人很懒" ); userList.add(user); } addUser.userService.saveBatch(userList, Math.max(userList.size()/10 ,1 )); return 1 ; } int middle = (end + start) / 2 ; AddUser addUser1= new AddUser (start, middle); AddUser addUser2 = new AddUser (middle+1 , end); invokeAll(addUser1, addUser2); return 1 ; } }
线程池开启配置可以通过
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
去配置ExecutorService 线程池详解 - simpleDi - 博客园