# inspired by toy-rx: https://github.com/staltz/toy-rx class Subscriber attr_writer :subscription def initialize(observer) @observer = observer end def next(value) @observer.next(value) end def error(error) @observer.error(error) end def complete @observer.complete end def unsubscribe @subscription.unsubscribe unless @subscription.nil? end end class Observable def initialize(&subscribe) @subscribe = subscribe end def self.create(&subscribe) fail ArgumentError, "`create` needs a subscribe block" unless block_given? new do |observer| subscriber = Subscriber.new(observer) subscription = subscribe.call(subscriber); subscriber.subscription = subscription subscription end end # XXX probably nicer to define operators in another file def map(&tx) inobservable = self Observable.create do |out| observer = { next: -> (x) { begin out.next(tx.call(x)) rescue => e out.error(e) end }, error: -> (e) { out.error(e) }, complete: -> (e) { out.complete }, }; inobservable.subscribe(observer) end end def subscribe(callbacks = {}, &nextcb) callbacks[:next] = nextcb if block_given? noop = -> () {} klass = Class.new do define_method(:next, callbacks.fetch(:next, noop)) define_method(:error, callbacks.fetch(:error, noop)) define_method(:complete, callbacks.fetch(:complete, noop)) end observer = klass.new @subscribe.call(observer) end end