Handler机制回顾

Handler机制回顾

Posted by Cc1over on January 16, 2020

Handler机制回顾

前言

最近这段时间,回过头去看一些以前学习过的知识,真的是同样的代码有不同的感受,以前看不懂没理解的地方,慢慢地可以更加深入地理解了

应用启动流程

应用进程的启动流程主要为:

  • Launcher-> startActivity
  • AMS-> startActivity
  • Zygote-> fork进程
  • ActivityThread-> main
    • ActivityThread-> attach
    • handleBindApplication
    • attachBaseContext
    • installContentProviders
    • Application-> onCreate

在应用启动过程中给我们能拿到回调接口的只有attachBaseContext和onCreate,而在ActivityThread的main函数里还会做主线程的Looper的prepare,然后开启循环

而这就是最早的对Looper的初始化工作

Looper

ActivityThread-> main

public static void main(String[] args) {
    // ......
    Looper.prepareMainLooper();
    Looper.loop();
    // ......
}

Looper-> prepareMainLooper

public static void prepareMainLooper() {
        prepare(false);
        synchronized (Looper.class) {
            if (sMainLooper != null) {
                throw new IllegalStateException("The main Looper has already been prepared.");
            }
            sMainLooper = myLooper();
        }
    }

通过prepare方法把主线程的Looper设置到ThreadLocal中,然后用静态成员sMainLooper保存一份副本

Looper-> prepare

static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();

private static void prepare(boolean quitAllowed) {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper(quitAllowed));
 }

使用Looper中的静态ThreadLocal保存一个Looper对象,保证了多线程读共享数据的安全问题,不需加锁就可以同步访问,而且保证了一个线程只有一个Looper对象

Looper

private Looper(boolean quitAllowed) {
        mQueue = new MessageQueue(quitAllowed);
        mThread = Thread.currentThread();
    }

在Looper创建的时候会创建一个MessageQueue,然后储存当前的线程

Looper-> loop

public static void loop() {
        // 获取当前线程的Looper
        final Looper me = myLooper();
        // 为空抛出异常
        if (me == null) {
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        // 获取Looper中的MessageQueue
        final MessageQueue queue = me.mQueue;
    
        // ......
    
        // 开启死循环
        for (;;) {
            // 从MessageQueue中获取下一个Message
            Message msg = queue.next(); // might block
            if (msg == null) {
                // No message indicates that the message queue is quitting.
                return;
            }

            // This must be in a local variable, in case a UI event sets the logger
            // 可以通过向Looper注册一个Logging,在获取Message的时候用Logging打印信息
            final Printer logging = me.mLogging;
            if (logging != null) {
                logging.println(">>>>> Dispatching to " + msg.target + " " +
                        msg.callback + ": " + msg.what);
            }

            final long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;

            final long traceTag = me.mTraceTag;
            if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
                Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
            }
            final long start = (slowDispatchThresholdMs == 0) ? 0 : SystemClock.uptimeMillis();
            final long end;
            try {
                // 获取msg的traget,也执行handler的dispatchMessage方法
                msg.target.dispatchMessage(msg);
                end = (slowDispatchThresholdMs == 0) ? 0 : SystemClock.uptimeMillis();
            } finally {
                if (traceTag != 0) {
                    Trace.traceEnd(traceTag);
                }
            }
            if (slowDispatchThresholdMs > 0) {
                final long time = end - start;
                if (time > slowDispatchThresholdMs) {
                    Slog.w(TAG, "Dispatch took " + time + "ms on "
                            + Thread.currentThread().getName() + ", h=" +
                            msg.target + " cb=" + msg.callback + " msg=" + msg.what);
                }
            }
            // 用logging打印信息
            if (logging != null) {
                logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
            }

            // ......
            // 把Message回收到池中
            msg.recycleUnchecked();
        }
    }

主要的流程就是:

  • 从MessageQueue中获取下一个Message
  • 获取msg的trage,执行消息分发
  • 把Message回收到池中

这里还有一个关注点就是Logging,Looper对外暴露接口用于设置Logging,而这个Logging可以用于打印堆栈信息,从而定位耗时代码

## Handler

Handler()

public Handler() {
    this(null, false);
}

Handler(Callback callback, boolean async)

public Handler(Callback callback, boolean async) {
        if (FIND_POTENTIAL_LEAKS) {
            final Class<? extends Handler> klass = getClass();
            // 打印log,提示Handler内存泄漏
            if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
                    (klass.getModifiers() & Modifier.STATIC) == 0) {
                Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
                    klass.getCanonicalName());
            }
        }
        // 获取当前线程的Looper
        mLooper = Looper.myLooper();
        // 如果Looper为空就抛出异常
        if (mLooper == null) {
            throw new RuntimeException(
                "Can't create handler inside thread that has not called Looper.prepare()");
        }
        // 赋值操作
        mQueue = mLooper.mQueue;
        mCallback = callback;
        mAsynchronous = async;
    }

Handler-> sendMessageAtTime

public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
    MessageQueue queue = mQueue;
    if (queue == null) {
        return false;
    }
    return enqueueMessage(queue, msg, uptimeMillis);
}

handler中有一系列sendMessage以及post方法,但是最后都会转调Handler的sendMessageAtTime

Handler-> enqueueMessage

private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
        msg.target = this;
        if (mAsynchronous) {
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis);
    }

会给Message设置一个target,这个target就是发送这个Message的Handler,然后如果Handler设置了异步,就会给这个Message设置异步,而最后就是转调enqueueMessage把Message添加到MessageQueue中

然后在Looper的loop方法中会死循环拿到Message,然后根据target进行分发,而这个target就是入队前设置进去的

Handler-> dispatcherMessage

public void dispatchMessage(Message msg) {
        if (msg.callback != null) {
            handleCallback(msg);
        } else {
            if (mCallback != null) {
                if (mCallback.handleMessage(msg)) {
                    return;
                }
            }
            handleMessage(msg);
        }
    }

首先先判断Message中的callback是否存在,如果Message的callback存在就,直接执行callback的run方法,这种情况针对post一个Runnable的情况

然后如果Handler中的mCallback不为空时,则调用mCallback的handleMessage方法

最后则调用Handler的handleMessage方法

Handler-> handleCallback

private static void handleCallback(Message message) {
     message.callback.run();
}

MessageQueue

MessageQueue

MessageQueue(boolean quitAllowed) {
    mQuitAllowed = quitAllowed;
    // 初始化native层的MessageQueue,然后存储它的地址到mPtr中
    mPtr = nativeInit();
}

MessageQueue-> enqueueMessage

boolean enqueueMessage(Message msg, long when) {
        if (msg.target == null) {
            throw new IllegalArgumentException("Message must have a target.");
        }
        if (msg.isInUse()) {
            throw new IllegalStateException(msg + " This message is already in use.");
        }

        synchronized (this) {
            if (mQuitting) {
                IllegalStateException e = new IllegalStateException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG, e.getMessage(), e);
                msg.recycle();
                return false;
            }

            msg.markInUse();
            msg.when = when;
            Message p = mMessages;
            boolean needWake;
            // p==null代表MessageQueue为空,或者msg的触发时间是队列中最早的
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
                msg.next = p;
                mMessages = msg;
                // 阻塞需要唤醒
                needWake = mBlocked;
            } else {
                // Inserted within the middle of the queue.  Usually we don't have to wake
                // up the event queue unless there is a barrier at the head of the queue
                // and the message is the earliest asynchronous message in the queue.
                // 将消息按时间顺序插入到MessageQueue
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }

            // We can assume mPtr != 0 because mQuitting is false.
            // 唤醒队列
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }

MessageQueue中的Message是按照一定的时间顺序进行排列的,当需要插入消息时,会从队头开始遍历,直到找到消息应该插入的合适位置,以此保证所有消息的时间顺序

MessageQueue-> next

Message next() {
        // Return here if the message loop has already quit and been disposed.
        // This can happen if the application tries to restart a looper after quit
        // which is not supported.
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }

        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }
            // 阻塞操作,等待nextPollTimeoutMillis时长,或者消息队列被唤醒,都会返回
            nativePollOnce(ptr, nextPollTimeoutMillis);
           
            synchronized (this) {
                // Try to retrieve the next message.  Return if found.
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                // 如果设置了同步屏障,在消息队列中找下一个异步消息
                if (msg != null && msg.target == null) {
                    // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                
                if (msg != null) {
                    // 当异步消息触发时间大于当前时间,则设置下一次轮询的超时时长
                    if (now < msg.when) {
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // Got a message.
                        // 获取一条消息并返回
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                        // 设置消息的使用状态,即flags |= FLAG_IN_USE
                        msg.markInUse();
                        return msg;
                    }
                } else {
                    // No more messages.
                    // 没有消息
                    nextPollTimeoutMillis = -1;
                }

                // Process the quit message now that all pending messages have been handled.
                // 如果消息队列正在退出,返回null
                if (mQuitting) {
                    dispose();
                    return null;
                }

                // If first time idle, then get the number of idlers to run.
                // Idle handles only run if the queue is empty or if the first message
                // in the queue (possibly a barrier) is due to be handled in the future.
                // 当消息队列为空,或者消息队列有第一个消息的时候
                if (pendingIdleHandlerCount < 0
                        && (mMessages == null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if (pendingIdleHandlerCount <= 0) {
                    // No idle handlers to run.  Loop and wait some more.
                    mBlocked = true;
                    continue;
                }

                if (mPendingIdleHandlers == null) {
                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                }
                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }

            // Run the idle handlers.
            // We only ever reach this code block during the first iteration.
            // 执行idle handlers
            // 执行完成后,重置pendingIdleHandlerCount为0 
            for (int i = 0; i < pendingIdleHandlerCount; i++) {
                final IdleHandler idler = mPendingIdleHandlers[i];
                // 去掉handler的引用
                mPendingIdleHandlers[i] = null; // release the reference to the handler

                boolean keep = false;
                try {
                    // idle时执行
                    keep = idler.queueIdle();
                } catch (Throwable t) {
                    Log.wtf(TAG, "IdleHandler threw exception", t);
                }

                if (!keep) {
                    synchronized (this) {
                        mIdleHandlers.remove(idler);
                    }
                }
            }

            // Reset the idle handler count to 0 so we do not run them again.
            // 重置idle handler个数为0,以保证不会再次重复运行
            pendingIdleHandlerCount = 0;

            // While calling an idle handler, a new message could have been delivered
            // so go back and look again for a pending message without waiting.
            // 当调用一个空闲handler时,一个新message能够被分发
            // 因此无需等待可以直接查询pending message
            nextPollTimeoutMillis = 0;
        }
    }

nativePollOnce是阻塞操作,其中nextPollTimeoutMillis代表下一个消息到来前,还需要等待的时长,当nextPollTimeoutMillis = -1时,表示消息队列中无消息,会一直等待下去

当处于空闲时,往往会执行IdleHandler中的方法,当nativePollOnce返回后,next从mMessages中提取一个消息

MessageQueue-> quit

void quit(boolean safe) {
        if (!mQuitAllowed) {
            throw new IllegalStateException("Main thread not allowed to quit.");
        }
        synchronized (this) {
            // 防止多次执行退出操作
            if (mQuitting) { 
                return;
            }
            mQuitting = true;
            if (safe) {
                // 移除尚未触发的所有消息
                removeAllFutureMessagesLocked(); 
            } else {
                // 移除所有的消息
                removeAllMessagesLocked(); 
            }
            nativeWake(mPtr);
        }
    }

这是在Looper创建的时候就留下的伏笔,一个safe的参数,作用于MessageQueue的quit方法:

  • 如果safe为true:移除尚未触发的所有消息
  • 如果safe为false:移除所有消息

小结

Handler是一个典型的生产者消费者模式的体现,Handler把Message发送到MessageQueue中,然后Looper开启循环不断从MessageQueue中获取下一条Message,然后找到Message的target进行分发,而当MessageQueue为空时则阻塞,等待下一条消息到来的时间,没有消息就一直等待,直到有新的消息添加到MessageQueue中,就会执行唤醒,而且唤醒的时候还有移除同步屏障的时候,而等待唤醒的操作都是由native层来完成的

而下半部分主要就针对native层的等待唤醒机制进行分析,主要还是在Java层中调用的3个native方法的分析:

nativeInit,nativeWake,nativePollOnce

MessageQueue-> nativeInit

android_os_MessageQueue.cpp-> android_os_MessageQueue_nativeInit

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    // 初始化native层Message
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    // 增加引用计数
    nativeMessageQueue->incStrong(env); 
    return reinterpret_cast<jlong>(nativeMessageQueue);
}

NativeMessageQueue

NativeMessageQueue::NativeMessageQueue()
            : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    // 获取当前线程的Looper
    mLooper = Looper::getForThread(); 
    if (mLooper == NULL) {
        // 创建一个native层的Looper
        mLooper = new Looper(false);
        // 把这个Looper设置到当前线程中
        Looper::setForThread(mLooper); 
    }
}

在native层的MessageQueue的初始化工作中会创建一个native层的Looper,而native层的Looper实际上和Java层的Looper是没有关联的,只是在native层实现了类似的逻辑,而真正构建关联的只有MessageQueue

Looper

Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    // 构建唤醒事件的fd
    mWakeEventFd = eventfd(0, EFD_NONBLOCK); 
    AutoMutex _l(mLock);
    // 重建epoll事件	       
    rebuildEpollLocked();  
}

Looper-> rebuildEpollLocked

void Looper::rebuildEpollLocked() {
    if (mEpollFd >= 0) {
        // 关闭旧的epoll实例
        close(mEpollFd); 
    }
    // 创建新的epoll实例
    mEpollFd = epoll_create(EPOLL_SIZE_HINT); 
    struct epoll_event eventItem;
    // 把未使用的数据区域进行置0操作
    memset(& eventItem, 0, sizeof(epoll_event)); 
    //可读事件
    eventItem.events = EPOLLIN; 
    eventItem.data.fd = mWakeEventFd;
    // 将唤醒事件(mWakeEventFd)添加到epoll实例(mEpollFd)
    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
    // ......
}

首先先关闭掉旧的Epoll实例,然后调用epoll_create函数创建一个新的epoll实例

然后调用epoll_ctl函数去对mWakeEventFd这个唤醒事件添加一个可读的事件

MessageQueue-> nativePollOnce

android_os_MessageQueue-> android_os_MessageQueue_nativePollOnce

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) {
    // 将Java层传递下来的mPtr转换为nativeMessageQueue
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis); 
}

NativeMessageQueue-> pollOnce

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    // ......
    mLooper->pollOnce(timeoutMillis); 
    // ......
}

Looper-> pollOnce

inline int pollOnce(int timeoutMillis) {
    return pollOnce(timeoutMillis, NULL, NULL, NULL); 5
}

Looper-> pollOnce

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        // ......
        result = pollInner(timeoutMillis);
    }
}

Looper-> pollInner

int Looper::pollInner(int timeoutMillis) {
    // ......
    // fd最大个数为16
    struct epoll_event eventItems[EPOLL_MAX_EVENTS]; 
    // 等待事件发生或者超时,在nativeWake函数,向管道写端写入字符,则该方法会返回;
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    // ......
     for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd) {
            if (epollEvents & EPOLLIN) {
                // 已经唤醒了,则读取并清空管道数据
                awoken(); 
            }
        }
     // ...... 
    return result;
}

在pollInner函数中找到我想看的逻辑,就是用epoll_wait等待事件的上报,就在这里对在native层构建的Looper中mWakeEventFd进行监听

然后已经唤醒之后会读取并清空管道数据

Looper-> awoken

void Looper::awoken() {
    uint64_t counter;
    //不断读取管道数据,目的就是为了清空管道内容
    TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
}

MessageQueue-> nativeWake

android_os_MessageQueue-> android_os_MessageQueue_nativeWake

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake(); 
}

NativeMessageQueue-> wake

void NativeMessageQueue::wake() {
    mLooper->wake(); 
}

Looper-> wake

void Looper::wake() {
    uint64_t inc = 1;
    // 向管道mWakeEventFd写入字符1
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            ALOGW("Could not write wake signal, errno=%d", errno);
        }
    }
}

其中TEMP_FAILURE_RETRY是一个宏定义, 当执行write失败后,会不断重复执行,直到执行成功为止

参考资料:

Android消息机制1-Handler(Java)

Android消息机制2-Handler(Native层)

select/poll/epoll对比分析