深入聊聊Android消息机制中的消息队列的设计

2019-09-15

背景

Android系统中有一个核心的技术点——Android消息机制。

无论是系统开发者,还是应用开发者,深入理解这个机制,都是必要的,对日常的开发能起到事半功倍的效果。

在这个这个机制中,有四个非常重要的角色:Handler Looper MessageQueue Message,从它们的名字都能看出来它们各自的作用:

  1. Handler: 中文翻译叫手柄,可以用它控制消息的发送
  2. Looper:一个循环辅助工具,可以让线程循环起来
  3. MessageQueue:存储消息的数据结构,也就是接下来我将会深入分析的
  4. Message:消息本身

作为系统的核心,高性能是必不可少的。这篇文章,侧重点主要在消息队列的设计上,但也避免不了涉及到其它的三个角色,因为它们是相互配合着的。

注意:下面涉及到的代码,是基于Android-28,老版本的代码可能会有细微差异,不过不影响阅读。

回顾下队列

我们都知道,在大部分有关数据结构的书籍中,Queue是一种先进先出的数据结构,和Stack的先进后出相对应。

队列的主要操作有:入队、出队、判断队列是为空等。队列可以辅助解决一些广度优先遍历的问题,比如:二叉树的层序遍历

在日常开发中,Queue使用也还比较多的。例如:Volley源码中就使用了BlockingQueue维护Request; 还有,OKHttp中同样是使用了Queue来维护Request

MessageQueue

Android系统中的这个消息队列,也符合传统队列的特性吗?它内部使用链表实现,还是数组实现?它也是先进先出的吗?

类似,但是又有点不一样。

链表 VS 数组?

队列可以用数组实现,也可以用链表实现,这要看具体的场景。

那Android消息机制中的这个MessageQueue应该选用什么数据结构来实现呢?

我们细想下,这里肯定选用链表,因为消息的插入和删除非常频繁,链表是较好的选择,而且,使用链表带来的另外一个好处,就是扩容很简单。

如果使用数组的话,扩容会涉及到数组的拷贝,这对性能损耗太大。

先进先出?

这点和传统的队列有点不一样,主要区别在于Android的这个队列中的消息是按照时间先后顺序来存储的,时间较早的消息,越靠近队头。

当然,我们也可以理解成,它是先进先出的,只是这里的依据的不是谁先入队,而是消息待发送的时间。

如何实现唤醒和阻塞?

我们知道,Java中实现线程的唤醒和阻塞,有以下一些方法:

Object.wait() / notify()

Java并包中提供的一些工具类:CountDownLatch Barrier所使用的UnSafe底层的park方法

起初,Android系统的这个消息队列使用的是Object中的wait和notify;

后期,Google团队优化了下,使用了Linux系统的epoll机制,而且,此时的消息队列在Java层和Native层(c++层)分别有一个,逻辑类似。

消息需要延时发送,怎么办?

当我们往消息队列中加入消息时,一般是通过Handler的一些方法,如下:

public final boolean post(Runnable r)
    {
       return  sendMessageDelayed(getPostMessage(r), 0);
    }
    public final boolean postAtTime(Runnable r, long uptimeMillis)
    {
        return sendMessageAtTime(getPostMessage(r), uptimeMillis);
    }

    .......

    public final boolean sendMessageDelayed(Message msg, long delayMillis)
    {
        if (delayMillis < 0) {
            delayMillis = 0;
        }
        return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
    }

    public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
        MessageQueue queue = mQueue;
        if (queue == null) {
            RuntimeException e = new RuntimeException(
                    this + " sendMessageAtTime() called with no mQueue");
            Log.w("Looper", e.getMessage(), e);
            return false;
        }
        return enqueueMessage(queue, msg, uptimeMillis);
    }

最终都会调用sendMessageAtTime(Message msg, long uptimeMillis)。

看延迟发送的那个方法:sendMessageDelayed,我们会发现,它只是在将消息发送的时间加上需要延迟的间隔时间。

那最终又是如何实现延迟的呢?

一句话:利用的还是epoll机制。

epoll机制涉及到epoll_wait这个系统调用,有一个参数是timeout,表示epoll_wait()将要阻塞的毫秒数,具体描述如下:

The epoll_wait() system call waits for events on the epoll(7)
 instance referred to by the file descriptor epfd.  The memory area
 pointed to by events will contain the events that will be available
 for the caller.  Up to maxevents are returned by epoll_wait().  The
 maxevents argument must be greater than zero.

 The timeout argument specifies the number of milliseconds that
 epoll_wait() will block.  Time is measured against the
 CLOCK_MONOTONIC clock.  The call will block until either:

 *  a file descriptor delivers an event;

 *  the call is interrupted by a signal handler; or

 *  the timeout expires.

 Note that the timeout interval will be rounded up to the system clock
 granularity, and kernel scheduling delays mean that the blocking
 interval may overrun by a small amount.  Specifying a timeout of -1
 causes epoll_wait() to block indefinitely, while specifying a timeout
 equal to zero cause epoll_wait() to return immediately, even if no
 events are available.

所以,我们可以将消息的发送时间和系统当前时间作一个差值, 将这个差值作为epoll_wait()的第三个参数timeout。

具体逻辑分布在如下这些代码中:

  1. Java层的MessageQueue出队方法next()
  2. Native层的MessageQueue pollOnce
  3. Native层Looper.cpp pollOnce和pollInner方法

消息使用完了,就扔了吗?

答案肯定是否定的。

这里很有必要使用缓存技术,减少消息创建的成本。当我们每次需要创建Message的时候,从缓存池中获取,如果缓存池没有,再创建。

这就是为什么Handler中封装了一系列方法:obtainMessage

消息缓存池的逻辑如下:

/**
  * Return a new Message instance from the global pool. Allows us to
  * avoid allocating new objects in many cases.
  */
    public static Message obtain() {
        synchronized (sPoolSync) {
            if (sPool != null) {
                Message m = sPool;
                sPool = m.next;
                m.next = null;
                m.flags = 0; // clear in-use flag
                sPoolSize--;
                return m;
            }
        }
        return new Message();
    }

附上消息队列的入队和出队列方法源码

boolean enqueueMessage(Message msg, long when) {
      if (msg.target == null) {
          throw new IllegalArgumentException("Message must have a target.");
      }
      if (msg.isInUse()) {
          throw new IllegalStateException(msg + " This message is already in use.");
      }

      synchronized (this) {
          if (mQuitting) {
              IllegalStateException e = new IllegalStateException(
                      msg.target + " sending message to a Handler on a dead thread");
              Log.w(TAG, e.getMessage(), e);
              msg.recycle();
              return false;
          }

          msg.markInUse();
          msg.when = when;
          Message p = mMessages;
          boolean needWake;
          if (p == null || when == 0 || when < p.when) {
              // New head, wake up the event queue if blocked.
              msg.next = p;
              mMessages = msg;
              needWake = mBlocked;
          } else {
              // Inserted within the middle of the queue.  Usually we don't have to wake
              // up the event queue unless there is a barrier at the head of the queue
              // and the message is the earliest asynchronous message in the queue.
              needWake = mBlocked && p.target == null && msg.isAsynchronous();
              Message prev;
              for (;;) {
                  prev = p;
                  p = p.next;
                  if (p == null || when < p.when) {
                      break;
                  }
                  if (needWake && p.isAsynchronous()) {
                      needWake = false;
                  }
              }
              msg.next = p; // invariant: p == prev.next
              prev.next = msg;
          }

          // We can assume mPtr != 0 because mQuitting is false.
          if (needWake) {
              nativeWake(mPtr);
          }
      }
      return true;
  }

  Message next() {
        // Return here if the message loop has already quit and been disposed.
        // This can happen if the application tries to restart a looper after quit
        // which is not supported.
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }

        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }

            nativePollOnce(ptr, nextPollTimeoutMillis);

            synchronized (this) {
                // Try to retrieve the next message.  Return if found.
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                if (msg != null && msg.target == null) {
                    // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                if (msg != null) {
                    if (now < msg.when) {
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // Got a message.
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                        msg.markInUse();
                        return msg;
                    }
                } else {
                    // No more messages.
                    nextPollTimeoutMillis = -1;
                }

                // Process the quit message now that all pending messages have been handled.
                if (mQuitting) {
                    dispose();
                    return null;
                }

                // If first time idle, then get the number of idlers to run.
                // Idle handles only run if the queue is empty or if the first message
                // in the queue (possibly a barrier) is due to be handled in the future.
                if (pendingIdleHandlerCount < 0
                        && (mMessages == null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if (pendingIdleHandlerCount <= 0) {
                    // No idle handlers to run.  Loop and wait some more.
                    mBlocked = true;
                    continue;
                }

                if (mPendingIdleHandlers == null) {
                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                }
                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }

            // Run the idle handlers.
            // We only ever reach this code block during the first iteration.
            for (int i = 0; i < pendingIdleHandlerCount; i++) {
                final IdleHandler idler = mPendingIdleHandlers[i];
                mPendingIdleHandlers[i] = null; // release the reference to the handler

                boolean keep = false;
                try {
                    keep = idler.queueIdle();
                } catch (Throwable t) {
                    Log.wtf(TAG, "IdleHandler threw exception", t);
                }

                if (!keep) {
                    synchronized (this) {
                        mIdleHandlers.remove(idler);
                    }
                }
            }

            // Reset the idle handler count to 0 so we do not run them again.
            pendingIdleHandlerCount = 0;

            // While calling an idle handler, a new message could have been delivered
            // so go back and look again for a pending message without waiting.
            nextPollTimeoutMillis = 0;
        }
    }

总结

当分析完Android消息机制中的这个MessageQueue,我们能体会到,为了提高性能,MessageQueue使用到了很多优雅的技术点。

参考

--EOF--

编辑于 2019-09-15 14:43