并发编程如何才能不再头疼:iOS中的协程

什么是协程

当前iOS并发编程的痛点

  • 网络、IO等耗时接口都是同步方法,一般都放在后台线程调用,然后主线程刷新UI。这里使用GCD等。
  • 缺乏统一的异步编程模型,只能使用delegate(逻辑割裂、共享则不安全)、closure(回调地狱,调试不便等)、Notification等,经常忘记调用completion等。
  • 多线程异步回调的复杂性和不可控性。
  • 多线程涉及到的锁、信号量、GCD Group等线程安全措施。还可能有野指针访问、容器操作、非主线程更新UI导致错误、过度释放等严重问题。
  • 基于Promise、GCD语法糖等第三方方案并不完美,且维护困难。
  • 响应式编程的概念复杂、调试困难、学习曲线高。

协程的基本概念

抢占式调度

当前iOS中的多线程调度都是抢占式的,每次阻塞、切换线程都需要系统调用,即CPU先执行调度程序,再切换对应的线程。而多个线程之间还会存在互斥加锁之类的开销。多线程还要非常小心地处理被抢占时的状态保存等操作。

非抢占式调度

一个进程包含多个线程,一个线程可以包含多个协程。但多个协程的执行是串行的。

多核CPU的场景是,多进程或一个进程内的多个线程可以并行运行,但是一个线程内的多个协程却只能串行执行。当一个协程运行时,其他协程必须挂起。

协程是基于线程,非抢占式调度,一般是执行完毕主动让出CPU。由协程调度器进行协程切换。

所以协程本质上是单线程的,没有线程切换的开销,也没有多线程同步加锁之类的开销。相对于线程而言,协程是非常轻量级的。

非主协程中执行的任务必须要是异步的。

协程的特点

  • 非抢占式:无需系统调用,协程调度是线程安全,无需lock等。
  • 挂起执行:保存寄存器和栈,不影响其他协程执行。
  • 恢复执行:恢复之前的寄存器和栈,无缝切换回之前的执行逻辑。

举个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
co_launch(^{
NSLog(@"1");
});

co_launch(^{
co_delay(2);
NSLog(@"2");
});

co_launch(^{
co_delay(1);
NSLog(@"2-delay-1");
});

co_launch(^{
NSLog(@"3");
});

co_launch_onqueue(dispatch_queue_create(0, 0), ^{
NSLog(@"4");
});

co_launch_onqueue(dispatch_queue_create(0, 0), ^{
NSLog(@"5");
});

NSLog(@"0");
// 0 4 5 1 3 2-delay-1 2
// 4和5为后台线程。其中的4、5的顺序可能不一定。

其他编程语言中的协程

promise

promise在javascript非常常见。iOS也有类似的三方库。

async/await

js已经有了。

goroutine

go语言中的go routine和go channel可以说是非常独特的并发编程方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func goChannel() {
ch2 := make(chan string, 1)

// 关键字 go 后跟的就是需要被并发执行的代码块
go func() {
time.Sleep(time.Second * 2)
fmt.Println("协程中:数据来了")
time.Sleep(time.Second * 2)
ch2 <- " 已到达!"
}()

fmt.Println("等待数据:已阻塞")

// chan接收操作,在chan没有有效数据的情况下会被阻塞,
// 所以这里会阻塞, 等到ch2 <- "已到达!"执行完毕, 可取出的时候继续代码.

var value string = "测试数据"
value = value + <-ch2

// 重复关闭chan会导致panic
close(ch2)

fmt.Println(value)

time.Sleep(time.Second)
}

执行 go run demo-goroutine.go 结果如下:

1
2
3
等待数据:已阻塞
协程中:数据来了
测试数据 已到达!

Python中的yield

Kotlin中的Generator

自己动手使用C语言实现简单的协程

ucontext

ucontext结构体是协程中的基本结构。

1
2
3
4
5
6
7
typedef struct ucontext {
struct ucontext *uc_link; // 后继context
sigset_t uc_sigmask; // 该context中的阻塞信号集合
stack_t uc_stack; // 该context中使用的stack
mcontext_t uc_mcontext; // 保存的context的特定机器表示,包括调用线程的特定寄存器等。
...
} ucontext_t;

其中,uc_link必不可少,用于指定后继context,这样才能实现context切换。

un_stack存储当前context的数据,切换时需要保存。

ucontext相关操作函数

相关操作函数其实非常简单

getcontext

1
int getcontext(ucontext_t *ucp);

初始化ucp结构体,保存当前context到ucp中。

setcontext

1
int setcontext(const ucontext_t *ucp);

设置当前的context为ucp,该ucp通过getcontext或makecontext取得。

  1. 若该context是getcontext取得的,则继续执行该context的调用,即切换至该context中执行。
  2. 若该context是makecontext取得的,则会执行其指定的调用函数func。函数func返回,则恢复makecontext第一个参数指向的context中指向的uc_link。若uc_link为NULL,则线程退出。

getcontext和setcontext是这里的关键。请看这段C代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#ifdef __APPLE__
#define _XOPEN_SOURCE
#endif

#include <stdio.h>
#include <ucontext.h>
#include <unistd.h>

int main(int argc, const char *argv[]){
ucontext_t context;

getcontext(&context);
puts("Hello world");
sleep(1);
setcontext(&context);

puts("1111");

return 0;
}

这段代码执行的结果是一直循环输出hello world,不会终止。原因:

  1. 通过getcontext将当前上下文保存到context中
  2. 输出hello world
  3. 通过setcontext将程序恢复到之前保存的context,即又会回到getcontext的地方,继续重复执行。

makecontext

1
void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...);

把func关联到context。

修改通过getcontext取得的context的ucp。即调用之前必须要先调用getcontext。然后给该context指定一个ucp->stack,设置后继的context即ucp->uc_link。

当该context通过setcontext或swapcontext激活后,执行func函数,func返回后,后继的context被激活,若后继context为NULL,则线程退出。

swapcontext

1
int swapcontext(ucontext_t *oucp, ucontext_t *ucp);

保存当前上下文到oucp结构体中,然后激活ucp上下文。

一般oucp不设置,即当前context是为main即可。

swap(&main, &child); // 切换到child context,保存当前context到main。

ucontext总结

以上四个函数的作用可以简要概括如下:

  1. getcontext 保存当前上下文
  2. setcontext 设置当前上下文,类似goto
  3. swapcontext 保存当前context,然后切换context, 类似goto
  4. makecontext 创建一个新的上下文

参考资料:

使用ucontext来模拟iOS中的多线程

iOS中多线程主要是main thread和global thread的使用和切换。

我们使用ucontext来模拟多线程切换,以加深理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#ifdef __APPLE__
#define _XOPEN_SOURCE
#endif

#include <stdio.h>
#include <ucontext.h>
#include <unistd.h>

void global_func(void *arg) {
puts("this is global_func");
puts("1234567890");
}

void test_context() {
char global_stack[1024*512];
ucontext_t main, global;

puts("this is main thread");

getcontext(&global);
global.uc_stack.ss_sp = global_stack;
global.uc_stack.ss_size = sizeof(global_stack);
global.uc_stack.ss_flags = 0;
global.uc_link = &main;
// global.uc_link = NULL;

makecontext(&global, (void (*)(void))global_func, 0);

// 切换到global,并保存当前上下文到main中。
swapcontext(&main, &global);
// 再次会导致segmentation fault
// swapcontext(&main, &global);
puts("this is main thread");
}

int main() {
test_context();

return 0;
}

输出结果:

1
2
3
4
this is main thread
this is global_func
1234567890
this is main thread

以上代码,模拟了iOS中最常见的多线程场景:

1
main thead -> 切换至global thread执行异步任务 -> 切换回main thread更新UI。

这段代码中,默认的ucontext可以视为main thread。

先创建了global的ucontext,初始化其参数,并指定其执行代码为global_func。

注意其中使用swapcontext(&main, &global);从main thread切换到global thread的代码。

在此基础之上,加入自行控制的调度逻辑,则可制作最小功能的协程库了。

coobjc

coobjc是阿里巴巴开源的iOS协程库。

1
2
3
4
5
6
7
8
9
提供了类似C#和Javascript语言中的Async/Await编程方式支持,在协程中通过调用await方法即可同步得到异步方法的执行结果,非常适合IO、网络等异步耗时调用的同步顺序执行改造。
提供了类似Kotlin中的Generator功能,用于懒计算生成序列化数据,非常适合多线程可中断的序列化数据生成和访问。
提供了Actor Model的实现,基于Actor Model,开发者可以开发出更加线程安全的模块,避免由于直接函数调用引发的各种多线程崩溃问题。
提供了元组的支持,通过元组Objective-C开发者可以享受到类似Python语言中多值返回的好处。

提供了对NSArray、NSDictionary等容器库的协程化扩展,用于解决序列化和反序列化过程中的异步调用问题。
提供了对NSData、NSString、UIImage等数据对象的协程化扩展,用于解决读写IO过程中的异步调用问题。
提供了对NSURLConnection和NSURLSession的协程化扩展,用于解决网络异步请求过程中的异步调用问题。
提供了对NSKeyedArchieve、NSJSONSerialization等解析库的扩展,用于解决解析过程中的异步调用问题。

实现原理

  1. 利用glibc的ucontext库(iOS废弃)。
  2. 使用汇编代码来切换上下文(实现C协程),原理同ucontext。
  3. 利用C语言的switch-case的技巧(Protothreads)。
  4. 利用C语言的setjmp和longjmp。
  5. 利用编译器支持语法糖。

coobjc即采用第二种,coroutine_context.s文件。 使用汇编语言来模拟ucontext,即setcontext和getcontext等函数。

1
2
3
_coroutine_getcontext
_coroutine_begin
_coroutine_setcontext

coobjc的核心原理是控制调用栈的主动让出和恢复。

  1. yield,让出CPU,会中断当前的执行,回到上一次resume的地方。
  2. resume,继续协程的执行,回到上一次yield的地方。

解决痛点

iOS业务开发过程中,存在很多网络请求、IO调用、数据解析等不能阻塞主线程的任务,可以放到后台线程执行。然而, 执行后又需要切换到主线程进行UI的更新。

一些复杂业务的线程切换代码复杂难懂。

同时,复杂并发任务需要引入GCD的group,semaphore,barrier等各种机制来配合完成,使得程序中很容易出现多线程竞争的问题,甚至出现死锁、崩溃等。

1
引入coobjc,在不改变原有业务代码的基础上,全局hook部分IO、数据解析方法,让原来主线程中同步执行的IO方法异步执行,且不影响原有业务逻辑。

一句话:以同步方式编写异步逻辑代码,防止线程切换的开销,防止lock、信号量等引发的问题。

使用方法

需要将后台线程执行的包装成异步操作!

使用Promise来包装异步操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// co_launch创建的协程默认在当前线程进行调度
// 使用await方法等待异步方法执行结束,得到异步执行结果
- (void)testCoobjcAsyncPromise {
MBCTimer(@"before");
co_launch(^{
NSString *urlString = @"http://t1.hxzdhn.com/uploads/tu/201612/58/1231036404.jpg";
NSURL *url = [NSURL URLWithString:urlString];

// async downloadDataFromUrl
NSData *imageData = await([self asyncPromiseDownloadDataFromURL:url]);

// async transform data to image
UIImage *image = await([self asyncPromiseDecodeImageData:imageData]);

// set image to imageView
self.imgView.image = image;
});
MBCTimer(@"after");
}

- (COPromise *)asyncPromiseDownloadDataFromURL:(NSURL *)url {
COPromise *promise = [COPromise promise];

// 模拟异步的事件
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{
NSData *imageData = [NSData dataWithContentsOfURL:url];
[promise fulfill:imageData];
// [promise reject:error];
});

return promise;
}

- (COPromise *)asyncPromiseDecodeImageData:(NSData *)imageData {
COPromise *promise = [COPromise promise];

// 模拟异步的事件
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{
UIImage *image = [UIImage imageWithData:imageData];
[promise fulfill:image];
// [promise reject:error];
});

return promise;
}

Swift也可以使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
co_launch {
let result = try await(channel: co_fetchSomething())
//
}

co_launch {
let result = try await(promise: co_fetchSomethingAsynchronous())
switch resust {
case .fulfilled(let data):
//
case .rejected(let error):
//
}
}

使用Channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
- (void)testCoobjcAsyncChannel {
MBCTimer(@"before");
co_launch(^{
NSString *urlString = @"http://t1.hxzdhn.com/uploads/tu/201612/58/1231036404.jpg";
NSURL *url = [NSURL URLWithString:urlString];

// async downloadDataFromUrl
NSData *imageData = await([self asyncChannelDownloadDataFromURL:url]);

// async transform data to image
UIImage *image = await([self asyncChannelDecodeImageData:imageData]);

// set image to imageView
self.imgView.image = image;
});
MBCTimer(@"after");
}

- (COChan *)asyncChannelDownloadDataFromURL:(NSURL *)url {
COChan *chan = [COChan chan];
co_launch(^{
NSData *imageData = [NSData dataWithContentsOfURL:url];
[chan send:imageData];
});
return chan;
}

- (COChan *)asyncChannelDecodeImageData:(NSData *)imageData {
COChan *chan = [COChan chan];
co_launch(^{
UIImage *image = [UIImage imageWithData:imageData];
[chan send:image];
});
return chan;
}

错误处理

协程中的所有方法都是直接返回值的,并未返回错误,而在执行过程中的错误是通过co_getError()获取的。
没有复杂的错误处理机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
- (COPromise *)co_GET:(NSString *)url
parameters:(NSDictionary *)parameters {
COPromise *promise = [COPromise promise];
[self GET:url
parameters:parameters
progress:nil
success:^(NSURLSessionDataTask * _Nonnull task, id _Nullable responseObject) {
[promise fulfill:responseObject];
}
failure:^(NSURLSessionDataTask * _Nullable task, NSError * _Nonnull error) {
[promise reject:error];
}];
}

如上的网络请求数据,失败时promise会reject:error。

使用方式:

1
2
3
4
5
6
7
8
co_launch(^{
id response = await([self co_GET:feedModel.feedUrl parameters:nil]);
if (co_getError()) {
// handle error info
return;
}
// ...
});

创建生成器

类似Python中的yield。生成器是一次生成一个值的特殊类型函数,可将其视为可恢复函数。

在函数的执行过程中,yield语句将你需要的值返回给调用生成器的地方,然后退出函数,下一次调用生成器函数的时候又从上次中断的地方开始执行。

而生成器内的所有变量参数都会被保存下来以供下一次使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
COCoroutine *co1 = co_sequence(^{
int index = 0;
while(co_isActive()) {
yield_val(@(index)); // 调用生成器将NSValue返回
index++;
}
});

co_launch(^{
for (int i = 0; i < 10; i++) {
val = [[co1 next] intValue];
}
});

类似Swift中的Sequence协议,对外封装一个next方法。

通过Generator,将传统的生产者加载数据->通知消费者,变成消费者需要数据->告诉生产者加载。

好处在哪里?

1
2
生成器可以在很多场景中进行使用,比如消息队列、批量下载文件、批量加载缓存等:
通过生成器,我们可以把传统的生产者加载数据->通知消费者模式,变成消费者需要数据->告诉生产者加载模式,避免了在多线程计算中,需要使用很多共享变量进行状态同步,消除了在某些场景下对于锁的使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int unreadMessageCount = 10;
NSString *userId = @"xxx";
COSequence *messageSequence = sequenceOnBackgroundQueue(@"message_queue", ^{
//在后台线程执行
while(1){
yield(queryOneNewMessageForUserWithId(userId));
}
});

//主线程更新UI
co(^{
for(int i = 0; i < unreadMessageCount; i++){
if(!isQuitCurrentView()){
displayMessage([messageSequence take]);
}
}
});

Actor

消息-订阅机制。

Actor就是一个容器,用于存储状态、行为、Mailbox及子Actor与Supervisor策略。Actor之间并不通信,而是通过Mail来互通。跟Mach及Mach-msg机制类似。

1
2
3
4
5
6
7
8
9
10
COActor *actor = co_actor_onqueue(^(COActorChan *channel) {
// 定义actor的状态变量
for (COActorMessage *message in channel) {
// 处理消息
}
}, q};

// 给actor发送消息
[actor send:@"send msg"];
[actor send:@(1)];

跟nodejs的订阅-发送机制一致。

Actor是一种并发模型,内部的状态由它自己维护,即它内部数据只能由它自己修改(通过消息传递来进行状态修改)。

所以使用Actor模型进行并发编程可以很好地避免加锁等线程问题。Actor由状态(State)、行为(Behavior)和邮箱(MailBox)三部分组成。

  • State:Actor的状态指的是Actor对象的变量信息,状态由Actor自己管理,避免了并发环境下的锁和内存原子性等问题。
  • Behavior:Actor中的计算逻辑,通过Actor接收到的消息来改变Actor的状态。
  • MailBox:Actor之间的通信桥梁,邮箱内部通过FIFO消息队列来存储发送方Actor的消息,接收方Actor从邮箱队列中获取消息。

coobjc的其他tip

可取消

1
2
3
4
5
6
7
8
COCoroutine *coroutine = co_launch({
co_delay(2);
if (co_isCancelled()) {
return;
}
// 执行
});
[coroutine cancel];

可暂停

暂停当前的协程,不会影响线程中的其他协程的运行。

1
2
3
4
5
6
7
co_launch({
co_delay(2);
});

co_launch({
NSLog(@"上一个协程暂停,而这个协程可以运行");
});

注意,这里的delay可以理解为当前协程让出CPU,即挂起。此时该线程寻找所属的其他协程来执行。

错误处理

执行过程中的错误是通过co_getError()

使得OC代码支持元组

1
2
3
4
5
6
7
8
9
10
11
COTuple *tuple = co_tuple(nil, @"1", @(2), @[@1,@2], @{@"key":@"value"});
// NSArray *array = tuple[3]; // 支持下标直接取出

id nonValue;
NSString *stringValue;
NSNumber *numberValue;
NSArray *arrayValue;
NSDictionary *dictValue;
co_unpack(&nonValue, &stringValue, &numberValue, &arrayValue, &dictValue) = tuple;

NSLog(@"testOCTuple");

coobjc的源码分析

coobjc使用汇编语言实现了一套类似ucontext的基本操作。

coroutine_ucontext

类似ucontext的结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typedef struct coroutine_ucontext {
uint64_t data[100];
} coroutine_ucontext_t;

struct coroutine_ucontext_re {
struct GPRs {
uint64_t __x[29]; // x0-x28
uint64_t __fp; // Frame pointer x29
uint64_t __lr; // Link register x30
uint64_t __sp; // Stack pointer x31
uint64_t __pc; // Program counter
uint64_t padding; // 16-byte align, for cpsr
} GR;
double VR[32];
};

类似ucontext的基本实现

1
2
3
4
5
6
extern int coroutine_getcontext (coroutine_ucontext_t *__ucp);

extern int coroutine_setcontext (coroutine_ucontext_t *__ucp);
extern int coroutine_begin (coroutine_ucontext_t *__ucp);

extern int coroutine_makecontext (coroutine_ucontext_t *__ucp, IMP func, void *arg, void *stackTop);

感兴趣的可以参考coobjc源码中的coroutine_context.s文件。

coroutine相关的一些结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
The structure store coroutine's context data.
*/
struct coroutine {
coroutine_func entry; // Process entry.
void *userdata; // Userdata.
coroutine_func userdata_dispose; // Userdata's dispose action.
void *context; // Coroutine's Call stack data.
void *pre_context; // Coroutine's source process's Call stack data.
int status; // Coroutine's running status.
uint32_t stack_size; // Coroutine's stack size
void *stack_memory; // Coroutine's stack memory address.
void *stack_top; // Coroutine's stack top address.
struct coroutine_scheduler *scheduler; // The pointer to the scheduler.

struct coroutine *prev;
struct coroutine *next;

void *autoreleasepage; // If enable autorelease, the custom autoreleasepage.
void *chan_alt; // If blocking by a channel, record the alt
bool is_cancelled; // The coroutine is cancelled
int8_t is_scheduler; // The coroutine is a scheduler.
};
typedef struct coroutine coroutine_t;
1
2
3
4
5
6
7
8
/**
Define the linked list of scheduler's queue.
*/
struct coroutine_list {
coroutine_t *head;
coroutine_t *tail;
};
typedef struct coroutine_list coroutine_list_t;
1
2
3
4
5
6
7
8
9
10
/**
Define the scheduler.
One thread own one scheduler, all coroutine run this thread shares it.
*/
struct coroutine_scheduler {
coroutine_t *main_coroutine;
coroutine_t *running_coroutine;
coroutine_list_t coroutine_queue;
};
typedef struct coroutine_scheduler coroutine_scheduler_t;

coroutine相关的一些操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
Create a new routine.

@param func main entrance
@return routine obj
*/
coroutine_t *coroutine_create(coroutine_func func);

/**
Add coroutine to scheduler, and resume the specified coroutine whatever.
*/
void coroutine_resume(coroutine_t *co); // 恢复

/**
Add coroutine to scheduler, and resume the specified coroutine if idle.
*/
void coroutine_add(coroutine_t *co); // 添加

/**
Yield the specified coroutine now.
*/
void coroutine_yield(coroutine_t *co); // 挂起
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
coroutine_t *coroutine_create(coroutine_func func) {
coroutine_t *co = calloc(1, sizeof(coroutine_t));
co->entry = func;
co->stack_size = STACK_SIZE;
co->status = COROUTINE_READY;

// check debugger is attached, fix queue debugging.
co_rebind_backtrace();
return co;
}

void coroutine_resume(coroutine_t *co) {
if (!co->is_scheduler) {
coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists();
co->scheduler = scheduler;

scheduler_queue_push(scheduler, co);

if (scheduler->running_coroutine) {
// resume a sub coroutine.
scheduler_queue_push(scheduler, scheduler->running_coroutine);
coroutine_yield(scheduler->running_coroutine);
} else {
// scheduler is idle
coroutine_resume_im(co->scheduler->main_coroutine);
}
}
}

void coroutine_add(coroutine_t *co) {
if (!co->is_scheduler) {
coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists();
co->scheduler = scheduler;
if (scheduler->main_coroutine->status == COROUTINE_DEAD) {
coroutine_close_ifdead(scheduler->main_coroutine);
coroutine_t *main_co = coroutine_create(coroutine_scheduler_main);
main_co->is_scheduler = true;
main_co->scheduler = scheduler;
scheduler->main_coroutine = main_co;
}
scheduler_queue_push(scheduler, co);

if (!scheduler->running_coroutine) {
coroutine_resume_im(co->scheduler->main_coroutine);
}
}
}

// use optnone to keep the `skip` not be optimized.
__attribute__ ((optnone))
void coroutine_yield(coroutine_t *co)
{
if (co == NULL) {
// if null
co = coroutine_self();
}
BOOL skip = false;
coroutine_getcontext(co->context);
if (skip) {
return;
}
#pragma unused(skip)
skip = true;
co->status = COROUTINE_SUSPEND;
coroutine_setcontext(co->pre_context);
}

调度器coroutine_scheduler

coroutine_scheduler的相关操作

有了类似ucontext的基本数据结构和基本操作,自行设计一个调度器即可:

scheduler的调度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// The main entry of the coroutine's scheduler
// The scheduler is just a special coroutine, so we can use yield.
void coroutine_scheduler_main(coroutine_t *scheduler_co) {

coroutine_scheduler_t *scheduler = scheduler_co->scheduler;
for (;;) {

// Pop a coroutine from the scheduler's queue.
coroutine_t *co = scheduler_queue_pop(scheduler);
if (co == NULL) {
// Yield the scheduler, give back cpu to origin thread.
coroutine_yield(scheduler_co);

// When some coroutine add to the scheduler's queue,
// the scheduler will resume again,
// then will resume here, continue the loop.
continue;
}
// Set scheduler's current running coroutine.
scheduler->running_coroutine = co;
// Resume the coroutine
coroutine_resume_im(co);

// Set scheduler's current running coroutine to nil.
scheduler->running_coroutine = nil;

// if coroutine finished, free coroutine.
if (co->status == COROUTINE_DEAD) {
coroutine_close_ifdead(co);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// use optnone to keep the `skip` not be optimized.
__attribute__ ((optnone))
void coroutine_resume_im(coroutine_t *co) {
switch (co->status) {
case COROUTINE_READY:
{
co->stack_memory = coroutine_memory_malloc(co->stack_size);
co->stack_top = co->stack_memory + co->stack_size - 3 * sizeof(void *);
// get the pre context
co->pre_context = malloc(sizeof(coroutine_ucontext_t));
BOOL skip = false;
coroutine_getcontext(co->pre_context);
if (skip) {
// when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func.
return;
}
#pragma unused(skip)
skip = true;

free(co->context);
co->context = calloc(1, sizeof(coroutine_ucontext_t));
coroutine_makecontext(co->context, (IMP)coroutine_main, co, (void *)co->stack_top);
// setcontext
coroutine_begin(co->context);

break;
}
case COROUTINE_SUSPEND:
{
BOOL skip = false;
coroutine_getcontext(co->pre_context);
if (skip) {
// when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func.
return;
}
#pragma unused(skip)
skip = true;
// setcontext
coroutine_setcontext(co->context);

break;
}
default:
assert(false);
break;
}
}

链表结构: 向队列中添加/删除coroutine对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#pragma mark - linked lists

void scheduler_queue_push(coroutine_scheduler_t *scheduler, coroutine_t *co) {
coroutine_list_t *queue = &scheduler->coroutine_queue;
if(queue->tail) {
queue->tail->next = co;
co->prev = queue->tail;
} else {
queue->head = co;
co->prev = nil;
}
queue->tail = co;
co->next = nil;
}

coroutine_t *scheduler_queue_pop(coroutine_scheduler_t *scheduler) {
coroutine_list_t *queue = &scheduler->coroutine_queue;
coroutine_t *co = queue->head;
if (co) {
queue->head = co->next;
// Actually, co->prev is nil now.
if (co->next) {
co->next->prev = co->prev;
} else {
queue->tail = co->prev;
}
}
return co;
}
1
2
3
4
5
6
7
8
9
void coroutine_makecontext (coroutine_ucontext_t *ctx, IMP func, void *arg, void *stackTop)
{
struct coroutine_ucontext_re *uctx = (struct coroutine_ucontext_re *)ctx;
uintptr_t stackBegin = (uintptr_t)stackTop - sizeof(uintptr_t);
uctx->GR.__fp = stackBegin;
uctx->GR.__sp = stackBegin;
uctx->GR.__x[0] = (uintptr_t)arg;
uctx->GR.__pc = (uintptr_t)func;
}

co_launch操作做了什么

co_launch操作用于建立一个协程,并将任务丢入其中执行。

1
2
3
co_launch({
NSLog(@"1");
});

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
Create a coroutine, then resume it asynchronous on current queue.

@param block the code execute in the coroutine
@return the coroutine instance
*/
NS_INLINE COCoroutine * _Nonnull co_launch(void(^ _Nonnull block)(void)) {
COCoroutine *co = [COCoroutine coroutineWithBlock:block onQueue:nil];
return [co resume];
}

- (instancetype)initWithBlock:(void (^)(void))block onQueue:(dispatch_queue_t)queue stackSize:(NSUInteger)stackSize {
self = [super init];
if (self) {
_execBlock = [block copy];
_dispatch = queue ? [CODispatch dispatchWithQueue:queue] : [CODispatch currentDispatch];

coroutine_t *co = coroutine_create((void (*)(void *))co_exec);
if (stackSize > 0 && stackSize < 1024*1024) { // Max 1M
co->stack_size = (uint32_t)((stackSize % 16384 > 0) ? ((stackSize/16384 + 1) * 16384) : stackSize); // Align with 16kb
}
_co = co;
coroutine_setuserdata(co, (__bridge_retained void *)self, co_obj_dispose);
}
return self;
}

- (COCoroutine *)resume {
COCoroutine *currentCo = [COCoroutine currentCoroutine];
BOOL isSubroutine = [currentCo.dispatch isEqualToDipatch:self.dispatch] ? YES : NO;
[self.dispatch dispatch_async_block:^{
if (self.isResume) {
return;
}
if (isSubroutine) {
self.parent = currentCo;
[currentCo addChild:self];
}
self.isResume = YES;
coroutine_resume(self.co);
}];
return self;
}

参考资料

坚持原创技术分享,您的支持将鼓励我继续创作! So,来杯咖啡?