27.信号量

前言

一、 控制 GCD 最大并发数

let queue = DispatchQueue.init(label: "RyukieQueue", qos: .default, attributes: .concurrent, autoreleaseFrequency: .workItem, target: nil)
let semaphore = DispatchSemaphore(value: 2)

for i in 0...5 {
    queue.async {
        _ = semaphore.wait(timeout: .distantFuture)
        print("<<< 下载开始:\(i) 日期:\(Date())")
        sleep(5)
        print("下载结束:\(i) >>> 日期:\(Date())")
        semaphore.signal()
    }
}
<<< 下载开始:1 日期:2021-09-11 08:28:56 +0000
<<< 下载开始:0 日期:2021-09-11 08:28:56 +0000

下载结束:0 >>> 日期:2021-09-11 08:29:01 +0000
<<< 下载开始:2 日期:2021-09-11 08:29:01 +0000
下载结束:1 >>> 日期:2021-09-11 08:29:01 +0000
<<< 下载开始:3 日期:2021-09-11 08:29:01 +0000

下载结束:2 >>> 日期:2021-09-11 08:29:06 +0000
<<< 下载开始:4 日期:2021-09-11 08:29:06 +0000
下载结束:3 >>> 日期:2021-09-11 08:29:06 +0000
<<< 下载开始:5 日期:2021-09-11 08:29:06 +0000

下载结束:5 >>> 日期:2021-09-11 08:29:11 +0000
下载结束:4 >>> 日期:2021-09-11 08:29:11 +0000

二、 源码解读

2.1 dispatch_semaphore_create

/*!
 * @function dispatch_semaphore_create
 *
 * @abstract
 * 用初始值创建新的计数信号量
 *
 * @discussion
 * 当两个线程需要协调时,为该值传递0是有用的 完成某一特定事件。
 * 传递一个大于零的值 用于管理有限的资源池,其中池大小相等 的价值
 * 
 * @param value
 * 信号量的起始值。传递一个小于零的值 导致返回NULL。
 * 
 *
 * @result
 * 新创建的信号量,如果失败则为NULL
 */
API_AVAILABLE(macos(10.6), ios(4.0))
DISPATCH_EXPORT DISPATCH_MALLOC DISPATCH_RETURNS_RETAINED DISPATCH_WARN_RESULT
DISPATCH_NOTHROW
dispatch_semaphore_t
dispatch_semaphore_create(intptr_t value);

2.2 dispatch_semaphore_signal

/*!

 * @function dispatch_semaphore_signal
 *
 * @abstract
 * Signal (increment) a semaphore.
 *
 * @discussion
 * 增加计数信号量。如果之前的值小于零
 * 这个函数在返回之前唤醒一个等待线程
 *
 * @param dsema The counting semaphore.
 * 在这个参数中传递NULL的结果是未定义的
 *
 * @result
 * 如果线程被唤醒,这个函数返回非零值。否则,返回零
 * 
 */


intptr_t
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
    // +1 的操作
    long value = os_atomic_inc2o(dsema, dsema_value, release);
    if (likely(value > 0)) {
    return 0;
    }

    // wait 操作过多 和signal信号不平衡 抛出异常
    if (unlikely(value == LONG_MIN)) {
    DISPATCH_CLIENT_CRASH(value,
                "Unbalanced call to dispatch_semaphore_signal()");
    }
    //因为是基于pthread下层的封装,为了避免下层的异常,这里做异常处理
    return _dispatch_semaphore_signal_slow(dsema);
}

2.3 dispatch_semaphore_wait

/*!
 * @function dispatch_semaphore_wait
 *
 * @abstract
 * 等待(递减)信号量
 *
 * @discussion
 * 递减计数信号量。如果结果值小于零, 
 * 这个函数在返回之前等待一个信号出现。
 *
 * @param dsema
 * 信号量。在这个参数中传递NULL的结果是未定义的
 *
 * @param timeout
 * 何时超时(参见dispatch_time)。为了方便,
 * 有 DISPATCH_TIME_NOW和DISPATCH_TIME_FOREVER常量。
 *
 * @result
 * 成功时返回零,如果超时则返回非零
 */

intptr_t
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
    // -1 操作
    long value = os_atomic_dec2o(dsema, dsema_value, acquire);
    if (likely(value >= 0)) {
        return 0;
    }
    return _dispatch_semaphore_wait_slow(dsema, timeout);
}

_dispatch_semaphore_wait_slow

static intptr_t
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
        dispatch_time_t timeout)
{
    long orig;

    _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);

    //判断timeout
    switch (timeout) {
    default:
    if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
        break;
    }
    // Fall through and try to undo what the fast path did to
    // dsema->dsema_value
        case DISPATCH_TIME_NOW:
            orig = dsema->dsema_value;
            while (orig < 0) {
        if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1,
                    &orig, relaxed)) {
                    // 处理超时
                    return _DSEMA4_TIMEOUT();
        }
            }
            // Another thread called semaphore_signal().
            // Fall through and drain the wakeup.
    case DISPATCH_TIME_FOREVER:
            //等待操作
            _dispatch_sema4_wait(&dsema->dsema_sema);
            break;
    }
    return 0;
}

_dispatch_sema4_wait

void
_dispatch_sema4_wait(_dispatch_sema4_t *sema)
{
    int ret = 0;
    do {
        ret = sem_wait(sema);
    } while (ret == -1 && errno == EINTR);
    DISPATCH_SEMAPHORE_VERIFY_RET(ret);
}

可以看出,内部其实是个 do-while 循环实现的。

三、 一个思考题

下面代码会是怎样的执行结果呢?

let queue = DispatchQueue.init(label: "RyukieQueue", qos: .default, attributes: .concurrent, autoreleaseFrequency: .workItem, target: nil)
let semaphore = DispatchSemaphore(value: 0)

queue.async {
    _ = semaphore.wait(timeout: .distantFuture)
    print("<<< 下载开始:\(1) 日期:\(Date())")
    sleep(5)
    print("下载结束:\(1) >>> 日期:\(Date())")
}

queue.async {
    print("<<< 下载开始:\(2) 日期:\(Date())")
    sleep(5)
    print("下载结束:\(2) >>> 日期:\(Date())")
    semaphore.signal()
}

输出:

<<< 下载开始:2 日期:2021-09-11 09:05:24 +0000

下载结束:2 >>> 日期:2021-09-11 09:05:29 +0000
<<< 下载开始:1 日期:2021-09-11 09:05:29 +0000

下载结束:1 >>> 日期:2021-09-11 09:05:34 +0000

参考

Last updated