11:55:54.736 [main] DEBUG c.ThreadTest3 - ---------
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at test.ThreadTest3.lambda$main$0(ThreadTest3.java:17)
at java.lang.Thread.run(Thread.java:748)
|-------------------------------------------------------|
| Object Header (64 bits) |
|-----------------------------|-------------------------|
| Mark Word (32 bits) | Klass Word (32 bits) |
|-----------------------------|-------------------------|
数组对象
|----------------------------------------------------------------------|
| Object Header (96 bits) |
|---------------------|-----------------------|------------------------|
| Mark Word(32bits) | Klass Word(32bits) | array length(32bits) |
|---------------------|-----------------------|------------------------|
00:07:54.978 [main] DEBUG c.Test9WaitNotify - take takeout to t2
00:07:54.981 [t2] DEBUG c.Test9WaitNotify - Get takeout, do eating.
00:07:54.981 [t2] DEBUG c.Test9WaitNotify - And then do delivery
00:07:54.982 [t1] DEBUG c.Test9WaitNotify - Get tobacco, do smoking
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis(); //调用方法时间
long now = 0; //已等待时间
if (millis < 0) { //验证等待时间不能为负数
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) { //设置等待时间为0则一直等待
while (isAlive()) { //如果线程存活
wait(0);
}
} else {
while (isAlive()) { //如果线程存活
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base; //设置已等待时间
}
}
}
public static void main(String[] args) throws InterruptedException {
Random random = new Random();
new People(random.nextInt(5000),"people1").start();
new People(random.nextInt(5000),"people2").start();
new People(random.nextInt(500),"people3").start();
TimeUnit.SECONDS.sleep(2);
MailCenter.getIds().forEach( integer -> {
new Postman(integer,"消息"+integer,"postman"+integer).start();
});
}
@Slf4j(topic = "c.People")
class People extends Thread{
private long timeout;
public People(long timeout,String threadName) {
this.timeout = timeout;
super.setName(threadName);
}
@Override
public void run() {
GuardedObject guardedObject = MailCenter.createGuardedObject();
log.debug("{} 开始等消息....",super.getName());
String s = guardedObject.get(timeout);
if(s != null)
log.debug("收到消息了:{}",s);
else {
log.debug("未收到消息");
}
}
}
Found one Java-level deadlock:
=============================
"t2":
waiting to lock monitor 0x000000000386b528 (object 0x00000000eb7c0ba0, a java.lang.Object),
which is held by "t1"
"t1":
waiting to lock monitor 0x0000000003867488 (object 0x00000000eb7c0bb0, a java.lang.Object),
which is held by "t2"
Java stack information for the threads listed above:
===================================================
"t2":
at test.Test13DeadLock.lambda$main$1(Test13DeadLock.java:33)
- waiting to lock <0x00000000eb7c0ba0> (a java.lang.Object)
- locked <0x00000000eb7c0bb0> (a java.lang.Object)
at test.Test13DeadLock$$Lambda$2/1232367853.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
"t1":
at test.Test13DeadLock.lambda$main$0(Test13DeadLock.java:23)
- waiting to lock <0x00000000eb7c0bb0> (a java.lang.Object)
- locked <0x00000000eb7c0ba0> (a java.lang.Object)
at test.Test13DeadLock$$Lambda$1/2117255219.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
Found 1 deadlock.
public static void main(String[] args){
Chopstick c1 = new Chopstick("c1");
Chopstick c2 = new Chopstick("c2");
Chopstick c3 = new Chopstick("c3");
Chopstick c4 = new Chopstick("c4");
new Philosopher("苏格拉底",c1,c2).start();
new Philosopher("柏拉图",c2,c3).start();
new Philosopher("亚里士多德",c3,c4).start();
new Philosopher("赫拉克里特",c4,c1).start();
}
static int x;
static Object m = new Object();
new Thread(()->{
synchronized(m) {
x = 10;
}
},"t1").start();
new Thread(()->{
synchronized(m) {
System.out.println(x);
}
},"t2").start();
线程对 volatile 变量的写,对接下来其它线程对该变量的读可见
volatile static int x;
new Thread(()->{
x = 10;
},"t1").start();
new Thread(()->{
System.out.println(x);
},"t2").start();
线程 start 前对变量的写,对该线程开始后对该变量的读可见
static int x;
x = 10;
new Thread(()->{
System.out.println(x);
},"t2").start();
线程结束前对变量的写,对其它线程得知它结束后的读可见。
static int x;
Thread t1 = new Thread(()->{
x = 10;
},"t1").start();
t1.join();
System.out.println(x);
static int x;
public static void main(String[] args) {
Thread t2 = new Thread(()->{
while(true) {
if(Thread.currentThread().isInterrupted()) {
System.out.println(x);
break;
}
}
},"t2")start();
new Thread(()->{
sleep(1);
x = 10;
t2.interrupt();
},"t1").start();
while(!t2.isInterrupted()) {
Thread.yield();
}
System.out.println(x);
}
对变量默认值(0,false,null)的写,对其它线程对该变量的读可见
具有传递性,如果 x hb-> y 并且 y hb-> z 那么有 x hb-> z ,配合 volatile 的防指令重排
volatile static int x;
static int y;
new Thread(()->{
y = 10;
x = 20;
},"t1").start();
new Thread(()->{
// x=20 对 t2 可见, 同时 y=10 也对 t2 可见
System.out.println(x);
},"t2").start();
单例模式的一些问题
实现一
// 问题1:为什么加 final (防止子类继承修改方法)
// 问题2:如果实现了序列化接口, 还要做什么来防止反序列化破坏单例
public final class Singleton implements Serializable {
// 问题3:为什么设置为私有? 是否能防止反射创建新的实例? (防止实例化,不能)
private Singleton() {}
// 问题4:这样初始化是否能保证单例对象创建时的线程安全? (可以)
private static final Singleton INSTANCE = new Singleton();
// 问题5:为什么提供静态方法而不是直接将 INSTANCE 设置为 public, 说出你知道的理由 (体现封装,可以提供泛型支持,可以对创建过程更多的控制等)
public static Singleton getInstance() {
return INSTANCE;
}
//(防止反序列化)
public Object readResolve() {
return INSTANCE;
}
}
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger();//可以传入int对象,默认为0
i.incrementAndGet(); //++i
i.getAndIncrement(); //i++;
i.getAndAdd(5); //先获取再加5
i.addAndGet(5); //先加5再获取
//可以重写传入getAndUpdate的方法,返回任何值
i.getAndUpdate( x -> x * 10);
i.updateAndGet( x -> x * 10);
updateAndGet(i,p -> p * 10);
i.get();
}
//getAndUpdate模拟实现
public static int updateAndGet(AtomicInteger i, IntUnaryOperator operator){
while(true){
int prev = i.get();
int next = operator.applyAsInt(prev);
if (i.compareAndSet(prev,next)) return next;
}
}
public static void main(String[] args) {
Student student = new Student();
AtomicReferenceFieldUpdater<Student, String> updater
= AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
updater.compareAndSet(student,null,"张三");
}
原子累加器
相较于AtomicLong,累加器LongAdder效率更高。
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
T adder = adderSupplier.get();
long start = System.nanoTime();
List<Thread> ts = new ArrayList<>();
// 4 个线程,每人累加 50 万
for (int i = 0; i < 40; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 500000; j++) {
action.accept(adder);
}
}));
}
ts.forEach(t -> t.start());
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(adder + " cost:" + (end - start)/1000_000);
}
for (int i = 0; i < 5; i++) {
demo(() -> new LongAdder(), adder -> adder.increment());
}
for (int i = 0; i < 5; i++) {
demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
}
性能提升的原因:在有竞争时,设置多个累加单元,Therad-0 累加Cell[0],而 Thread-1 累加 Cell[1]... 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
LongAdder
CAS锁
使用CAS也可也实现上锁
// 不要用于实践!!!
public class LockCas {
private AtomicInteger state = new AtomicInteger(0);
public void lock() {
while (true) {
if (state.compareAndSet(0, 1)) {
break;
}
}
}
public void unlock() {
log.debug("unlock...");
state.set(0);
}
}
//如果已经存在累加单元cells数组
if ((a = as[(n - 1) & h]) == null) { //如果cells中没有有该线程累加单元cell
if (cellsBusy == 0) { // Lock and Try to attach new Cell
Cell r = new Cell(x); // create 累加单元
if (cellsBusy == 0 && casCellsBusy()) { //加锁
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) { //Slot is non-empty
rs[j] = r; //将新建的cell放入cells
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//如果累加成功,退出循环
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//判断cells数组长度是否大于CPU个数
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide) //防止执行下一个执行扩容的else if
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h); //CPU超过上限,尝试改变线程累加单元累加
sum():
public long sum() {
Cell[] as = cells; Cell a;
long sum = base; //从基础中先获取值
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value; //把所有Cell中的值加在一起
}
}
return sum;
}
Unsafe
Unsafe对象提供了非常底层的操作内存、线程的方法,切不能直接调用,只能通过反射获得。
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
Unsafe unsafe = (Unsafe)theUnsafe.get(null);
//获取域的偏移量
long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id"));
long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name"));
Teacher teacher = new Teacher();
//执行cas操作
unsafe.compareAndSwapInt(teacher,idOffset,0,1);
unsafe.compareAndSwapObject(teacher,nameOffset,null,"张三");
利用Unsafe实现AtomicInteger
class MyAtomicInteger{
private volatile int value;
private static final long valueOffset; //保存域的偏移量
private static final Unsafe UNSAFE;
public MyAtomicInteger(int value) {
this.value = value;
}
static{
UNSAFE = UnsafeAccessor.getUnsafe();
try {
valueOffset = UNSAFE.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value"));
} catch (NoSuchFieldException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
public int getValue(){
return value;
}
public void decrement(int amount){
while(true){
int prev = this.value;
int next = prev - amount;
if (UNSAFE.compareAndSwapInt(this,valueOffset,prev,next)) {
break;
}
}
}
}
共享模式之不可变
不可变设计
String也是不可变的:
//final修饰,防止子类继承重写方法
public final class String{
/** The value is used for character storage. */
//final修饰防止char数组的引用被修改
private final char value[];
/** Cache the hash code for the string */
//私有,不设置set方法,使hash不可变
private int hash; // Default to 0
//其所有构造方法都会将传入对象的值复制给value指针
public String(String original) {
this.value = original.value;
this.hash = original.hash;
}
//保护性拷贝
public String substring(int beginIndex) {
... //校验
//返回的是一个新的字符串
return (beginIndex == 0) ? this : new String(value, beginIndex, subLen);
}
...
}
public static Long valueOf(long l) {
final int offset = 128;
if (l >= -128 && l <= 127) { // will cache
return LongCache.cache[(int)l + offset];
}
return new Long(l);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
缓存线程池newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static void main(String[] args){
GuardedObject guardedObject = new GuardedObject();
new Thread(() -> {
String s = guardedObject.get(500);
log.debug("t1 print response:{}.",s);
},"t1").start();
new Thread(() -> {
log.debug("i have to sleep");
TimeUnit.SECONDS.sleep(2);
log.debug("i have finished");
guardedObject.setResponse("hello world!");
},"t2").start();
}
@Slf4j(topic = "c.GuardedObject")
class GuardedObject{
private String response; //线程间传递的对象
public String get(long timeout){
final long start = System.currentTimeMillis(); //记录开始时间
long waitTime; //等待时间
synchronized (this){
while(response == null){ //循环等待直到有结果
long current = System.currentTimeMillis(); //获取每次得到锁的时间
waitTime = (current - start);
if(waitTime > timeout) { //如果超时
log.error("overtime waiting..return null");
return null;
}
this.wait(timeout - waitTime);
}
}
return response;
}
public void setResponse(String response){
synchronized (this){
this.response = response;
this.notifyAll(); //设置结果提醒所有线程
}
}
}
//消息类,被定义为final 防止子类重载方法
final class Message{
private int id;
private Object value;
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
//只设置get,方式发送修改保证线程安全
public int getId() { return id; }
public Object getValue() { return value; }
}
//消息队列类,缓冲生产者和消费者中间的Message
class MessageQueue{
private LinkedList<Message> list = new LinkedList<>();
private int capcity;
public MessageQueue(int capcity) {
this.capcity = capcity;
}
public Message take(){
synchronized (list){
while(list.isEmpty()){
log.debug("队列已空,请等待生产者生产");
list.wait();
}
Message message = list.removeFirst();
log.debug("消费:"+message);
list.notifyAll(); //唤醒因调用put方法而阻塞的线程
return message;
}
};
public void put(Message message){
synchronized (list){
while(list.size() >= capcity){
log.debug("队列已满,请等待消费者消费");
list.wait();
}
list.add(message);
log.debug("生产"+message);
list.notifyAll(); //唤醒因调用take方法而阻塞的线程
}
}
}
public static void main(String[] args){
//创建消息队列,初始化容量为2
MessageQueue messageQueue = new MessageQueue(2);
for (int i =0 ;i < 3 ;i++){
int id = i;
new Thread(() -> {
messageQueue.put(new Message(id,"产品"+id));
},"生产者"+id).start();
}
new Thread(() -> {
while(true){
ThreadSleep.sleep(1);
Message take = messageQueue.take();
}
},"消费者").start();
}
@Slf4j(topic = "c.Pool")
class Pool{
private final int poolSize; //连接池大小
private Connection[] connections; //保存连接池
private AtomicIntegerArray states; //连接池中连接的使用状态
public Pool(int poolSize) {
this.poolSize = poolSize;
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
//循环实例化对应个数的连接池
for (int i = 0;i < poolSize; i++){
this.connections[i] = new MyConnection();
}
}
public Connection getConnection(){
while(true){
for (int i = 0; i < poolSize;i++){
//找到可用的连接池并取出
if (states.get(i) == 0 && states.compareAndSet(i,0,1)){
log.debug("获取到了连接 connection{}", i+1);
return connections[i];
}
}
//未找到可用连接池,等待
synchronized (this){
this.wait();
}
}
}
public void resetConnection(Connection connection){
for (int i = 0;i < poolSize; i++){
if(connections[i] == connection){
states.set(i,0);
synchronized (this){
log.debug("归还连接。。。");
this.notifyAll();
}
break;
}
}
}
}
public static void main(String[] args) {
Pool pool = new Pool(2);
for(int i = 0;i < 4;i++){
new Thread(() ->{
Connection connection = pool.getConnection();
ThreadSleep.sleep(1);
pool.resetConnection(connection);
}).start();
}
}