你好,歡迎來到IOS教程網

 Ios教程網 >> IOS編程開發 >> IOS開發基礎 >> RACSignal的Subscription深入分析

RACSignal的Subscription深入分析

編輯:IOS開發基礎

ReactiveCocoa是一個FRP的思想在Objective-C中的實現框架,目前在美團的項目中被廣泛使用。對於ReactiveCocoa的基本用法,網上有很多相關的資料,本文不再討論。RACSignal是ReactiveCocoa中一個非常重要的概念,而本文主要關注RACSignal的實現原理。在閱讀之前,你需要基本掌握RACSignal的基本用法。

本文主要包含2個部分,前半部分主要分析RACSignal的subscription過程,後半部分是對前半部分的深入,在subscription過程的基礎上分析ReactiveCocoa中比較難理解的兩個操作:multicast && replay。

PS:為了解釋清楚,我們下面只討論next,不討論error以及completed,這二者與next類似。本文基於ReactiveCocoa 2.x版本。

我們先刨析RACSignal的subscription過程

RACSignal的常見用法

-(RACSignal *)signInSignal {
// part 1:[RACSignal createSignal]來獲得signal
  return [RACSignal createSignal:^RACDisposable *(id subscriber) {
    [self.signInService
     signInWithUsername:self.usernameTextField.text
     password:self.passwordTextField.text
     complete:^(BOOL success) {
    // part 3: 進入didSubscribe,通過[subscriber sendNext:]來執行next block
       [subscriber sendNext:@(success)];
       [subscriber sendCompleted];
     }];
    return nil;
  }];
}
// part 2 : [signal subscribeNext:]來獲得subscriber,然後進行subscription
[[self signInSignal] subscribeNext:^(id x) { 
    NSLog(@"Sign in result: %@", x); 
}];

Subscription過程概括

RACSignal的Subscription過程概括起來可以分為三個步驟:

  1. [RACSignal createSignal]來獲得signal

  2. [signal subscribeNext:]來獲得subscriber,然後進行subscription

  3. 進入didSubscribe,通過[subscriber sendNext:]來執行next block

步驟一:[RACSignal createSignal]來獲得signal

RACSignal.m中:

+ ( RACSignal *)createSignal:( RACDisposable * (^)( id < RACSubscriber > subscriber))didSubscribe {
  return [ RACDynamicSignal   createSignal :didSubscribe];
}
RACDynamicSignal.m中
+ ( RACSignal *)createSignal:( RACDisposable * (^)( id < RACSubscriber > subscriber))didSubscribe {
  RACDynamicSignal *signal = [[ self   alloc ] init ];
 signal-> _didSubscribe = [didSubscribe copy ];
  return [signal setNameWithFormat : @"+createSignal:" ];
}

[RACSignal createSignal]會調用子類RACDynamicSignal的createSignal來返回一個signal,並在signal中保存後面的 didSubscribe這個block

步驟二:[signal subscribeNext:]來獲得subscriber,然後進行subscription

RACSignal.m中:
- ( RACDisposable *)subscribeNext:( void (^)( id x))nextBlock {
  RACSubscriber *o = [ RACSubscriber   subscriberWithNext :nextBlock error : NULL   completed : NULL ];
  return [ self  subscribe :o];
}
RACSubscriber.m中:
+ ( instancetype )subscriberWithNext:( void (^)( id x))next error:( void (^)( NSError *error))error completed:( void (^)( void ))completed {
  RACSubscriber *subscriber = [[ self   alloc ] init ];
 subscriber-> _next = [next copy ];
 subscriber-> _error = [error copy ];
 subscriber-> _completed = [completed copy ];
  return subscriber;
}
RACDynamicSignal.m中:
- (RACDisposable *)subscribe:(id)subscriber {
    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
    subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
    if (self.didSubscribe != NULL) {
        RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
            RACDisposable *innerDisposable = self.didSubscribe(subscriber);
            [disposable addDisposable:innerDisposable];
        }];
        [disposable addDisposable:schedulingDisposable];
    }
    return disposable;
}
  1. [signal subscribeNext]先會獲得一個subscriber,這個subscriber中保存了nextBlock、errorBlock、completedBlock

  2. 由於這個signal其實是RACDynamicSignal類型的,這個[self subscribe]方法會調用步驟一中保存的didSubscribe,參數就是1中的subscriber

步驟三:進入didSubscribe,通過[subscriber sendNext:]來執行next block

RACSubscriber.m中:
- (void)sendNext:(id)value {
    @synchronized (self) {
        void (^nextBlock)(id) = [self.next copy];
        if (nextBlock == nil) return;
        nextBlock(value);
    }
}

任何時候這個[subscriber sendNext:],就直接調用nextBlock

signal的subscription過程回顧

從上面的三個步驟,我們看出:

  • 先通過createSignal和subscribeNext這兩個調用,聲明了流中value到來時的處理方式

  • didSubscribe block塊中異步處理完畢之後,subscriber進行sendNext,自動處理

搞清楚了RAC的subscription過程,接著在此基礎上我們討論一個RACSignal中比較容易混淆的兩個操作:multicast和replay。

為什麼要清楚這兩者的原理

RACSignal+Operation.h中
- (RACMulticastConnection *)publish;
- (RACMulticastConnection *)multicast:(RACSubject *)subject;
- (RACSignal *)replay;
- (RACSignal *)replayLast;
- (RACSignal *)replayLazily;
  • 在RACSignal+Operation.h中,連續定義了5個跟我們這個主題有關的RACSignal的操作,這幾個操作的區別很細微,但用錯的話很容易出問題。只有理解了原理之後,才明白它們之間的細微區別

  • 很多時候我們意識不到需要用這些操作,這就可能因為side effects執行多次而導致程序bug

multicast && replay的應用場景

"Side effects occur for each subscription by default, but there are certain situations where side effects should only occur once – for example, a network request typically should not be repeated when a new subscriber is added."

// 引用ReactiveCocoa源碼的Documentation目錄下的一個例子
// This signal starts a new request on each subscription.
RACSignal *networkRequest = [RACSignal createSignal:^(id subscriber) {
    AFHTTPRequestOperation *operation = [client
        HTTPRequestOperationWithRequest:request
        success:^(AFHTTPRequestOperation *operation, id response) {
            [subscriber sendNext:response];
            [subscriber sendCompleted];
        }
        failure:^(AFHTTPRequestOperation *operation, NSError *error) {
            [subscriber sendError:error];
        }];
    [client enqueueHTTPRequestOperation:operation];
    return [RACDisposable disposableWithBlock:^{
        [operation cancel];
    }];
}];
// Starts a single request, no matter how many subscriptions `connection.signal`
// gets. This is equivalent to the -replay operator, or similar to
// +startEagerlyWithScheduler:block:.
RACMulticastConnection *connection = [networkRequest multicast:[RACReplaySubject subject]];
[connection connect];
[connection.signal subscribeNext:^(id response) {
    NSLog(@"subscriber one: %@", response);
}];
[connection.signal subscribeNext:^(id response) {
    NSLog(@"subscriber two: %@", response);
}];
  1. 在上面的例子中,如果我們不用RACMulticastConnection的話,那就會因為執行了兩次subscription而導致發了兩次網絡請求。

  2. 從上面的例子中,我們可以看到對一個Signal進行multicast之後,我們是對connection.signal進行subscription而不是原來的networkRequest。這點是"side effects should only occur once"的關鍵,我們將在後面解釋

multicast原理分析

replay是multicast的一個特殊case而已,而multicast的整個過程可以拆分成兩個步驟,下面進行詳細討論

multicast的機制Part 1:

RACMulticastConnection.m中:
- (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
    NSCParameterAssert(source != nil);
    NSCParameterAssert(subject != nil);
    self = [super init];
    if (self == nil) return nil;
    _sourceSignal = source;
    _serialDisposable = [[RACSerialDisposable alloc] init];
    _signal = subject;
    return self;
}

結合上面的例子來看,RACMulticastConnection的init是以networkRequest作為sourceSignal,而最終connnection.signal指的是[RACReplaySubject subject]

RACMulticastConnection.m中:
- (RACDisposable *)connect {
    BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);
    if (shouldConnect) {
        self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
    }
    return self.serialDisposable;
}
  • 結合上面的RACSignal分析的Subscription過程,[self.sourceSignal subscribe:_signal]會執行self.sourceSignal的didSubscribe這個block。再結合上面的例子,也就是說會把_signal作為subscriber,發網絡請求,success的時候,_signal會sendNext,這裡的這個signal就是[RACReplaySubject subject]。可以看出,一旦進入到這個didSubscribe中,後續的不管是sendNext還是subscription,都是對這個[RACReplaySubject subject]進行的,與原來的sourceSignal徹底無關了。這就解釋了為什麼"side effects only occur once"。

multicast的機制Part 2:

在進行multicast的步驟二之前,需要介紹一下RACSubject以及RACReplaySubject

---------------------惱人的分隔線 start------------------

RACSubject

"A subject can be thought of as a signal that you can manually control by sending next, completed, and error."

RACSubject的一個用法如下:

RACSubject *letters = [RACSubject subject];
// Outputs: A B
[letters subscribeNext:^(id x) {
    NSLog(@"%@ ", x);
}];
[letters sendNext:@"A"];
[letters sendNext:@"B"];

接下來分析RACSubject的原理

RACSubject.m中:
- (id)init {
    self = [super init];
    if (self == nil) return nil;
    _disposable = [RACCompoundDisposable compoundDisposable];
    _subscribers = [[NSMutableArray alloc] initWithCapacity:1];    
    return self;
}
  • RACSubject中有一個subscribers數組

RACSubject.m中:
- (RACDisposable *)subscribe:(id)subscriber {
    NSCParameterAssert(subscriber != nil);
    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
    subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
    NSMutableArray *subscribers = self.subscribers;
    @synchronized (subscribers) {
        [subscribers addObject:subscriber];
    }
    return [RACDisposable disposableWithBlock:^{
        @synchronized (subscribers) {
            // Since newer subscribers are generally shorter-lived, search
            // starting from the end of the list.
            NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id obj, NSUInteger index, BOOL *stop) {
                return obj == subscriber;
            }];
            if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
        }
    }];
}
  • 從subscribe:的實現可以看出,對RACSubject對象的每次subscription,都是將這個subscriber加到subscribers數組中而已

RACSubject.m中:
- (void)sendNext:(id)value {
    [self enumerateSubscribersUsingBlock:^(id subscriber) {
        [subscriber sendNext:value];
    }];
}

從sendNext:的實現可以看出,每次RACSubject對象sendNext,都會對其中保留的subscribers進行sendNext,如果這個subscriber是RACSignal的話,就會執行Signal的next block。

RACReplaySubject

"A replay subject saves the values it is sent (up to its defined capacity) and resends those to new subscribers.",可以看出,replaySubject是可以對它send next(error,completed)的東西進行buffer的。

  • RACReplaySubject是繼承自RACSubject的,它的內部的實現例如subscribe:、sendNext:的實現也會調用super的實現

RACReplaySubject.m中:
- (instancetype)initWithCapacity:(NSUInteger)capacity {
    self = [super init];
    if (self == nil) return nil;
    _capacity = capacity;
    _valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);
    return self;
}
  • 從init中我們看出,RACReplaySubject對象持有capacity變量(用於決定valuesReceived緩存多少個sendNext:出來的value,這在區分replay和replayLast的時候特別有用)以及valuesReceived數組(用來保存sendNext:出來的value),這二者接下來會重點涉及到

RACReplaySubject.m中:
- (RACDisposable *)subscribe:(id)subscriber {
    RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
    RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
        @synchronized (self) {
            for (id value in self.valuesReceived) {
                if (compoundDisposable.disposed) return;
                [subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
            }
            if (compoundDisposable.disposed) return;
            if (self.hasCompleted) {
                [subscriber sendCompleted];
            } else if (self.hasError) {
                [subscriber sendError:self.error];
            } else {
                RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
                [compoundDisposable addDisposable:subscriptionDisposable];
            }
        }
    }];
    [compoundDisposable addDisposable:schedulingDisposable];
    return compoundDisposable;
}
  • 從subscribe:可以看出,RACReplaySubject對象每次subscription,都會把之前valuesReceived中buffer的value重新sendNext一遍,然後調用super把當前的subscriber加入到subscribers數組中

RACReplaySubject.m中:
- (void)sendNext:(id)value {
    @synchronized (self) {
        [self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
        [super sendNext:value];
        if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
            [self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
        }
    }
}

從sendNext:可以看出,RACReplaySubject對象會buffer每次sendNext的value,然後會調用super,對subscribers中的每個subscriber,調用sendNext。buffer的數量是根據self.capacity來決定的

---------------------惱人的分隔線 end------------------

介紹完了RACReplaySubject之後,我們繼續進行multicast的part 2部分。

在上面的例子中,我們對connection.signal進行了兩次subscription,結合上面的RACReplaySubject的subscription的subscribe:,我們得到以下過程:

  • [RACReplaySubject subject]會將這兩次subscription過程中的subscriber都保存在subscribers數組中

  • 當網絡請求success後,會[subscriber sendNext:response],前面已經講過這個subscriber就是[RACReplaySubject subject],這樣,就會把sendNext:的value保存在valuesReceived數組中,供後續subscription使用(不知道你是否注意到RACReplaySubject的subscribe:中有個for循環),然後對subscribers中保存的每個subscriber執行sendNext。

後續思考

上面討論的是RACReplaySubject對象先進行subscription,再進行sendNext,如果是先sendNext,再subscription呢?其實魅力就在於RACReplaySubject的subscribe:中的for循環。具體過程留作思考

在RACSignal+Operation中關於multicast && replay的,一共有5個操作:publish、multicast、replay、replayLast、replayLazily,他們之間有什麼細微的差別呢?相信在我上面內容的基礎上,他們之間的細微差別不難理解,這裡推薦一篇幫助大家理解的blog

參考資料

ReactiveCocoa github主頁

ReactiveCocoa Documentation

ReactiveCocoa raywenderlich上的資料

  1. 上一頁:
  2. 下一頁:
蘋果刷機越獄教程| IOS教程問題解答| IOS技巧綜合| IOS7技巧| IOS8教程
Copyright © Ios教程網 All Rights Reserved