summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--player/mruby/observable.mrb73
-rw-r--r--player/mruby/observable_test.rb27
2 files changed, 100 insertions, 0 deletions
diff --git a/player/mruby/observable.mrb b/player/mruby/observable.mrb
new file mode 100644
index 0000000000..3b97461dd0
--- /dev/null
+++ b/player/mruby/observable.mrb
@@ -0,0 +1,73 @@
+# inspired by toy-rx: https://github.com/staltz/toy-rx
+class Subscriber
+ attr_writer :subscription
+
+ def initialize(observer)
+ @observer = observer
+ end
+
+ def next(value)
+ @observer.next(value)
+ end
+
+ def error(error)
+ @observer.error(error)
+ end
+
+ def complete
+ @observer.complete
+ end
+
+ def unsubscribe
+ @subscription.unsubscribe unless @subscription.nil?
+ end
+end
+
+class Observable
+ def initialize(&subscribe)
+ @subscribe = subscribe
+ end
+
+ def self.create(&subscribe)
+ fail ArgumentError, "`create` needs a subscribe block" unless block_given?
+ new do |observer|
+ subscriber = Subscriber.new(observer)
+ subscription = subscribe.call(subscriber);
+ subscriber.subscription = subscription
+ subscription
+ end
+ end
+
+ # XXX probably nicer to define operators in another file
+ def map(&tx)
+ inobservable = self
+ Observable.create do |out|
+ observer = {
+ next: -> (x) {
+ begin
+ out.next(tx.call(x))
+ rescue => e
+ out.error(e)
+ end
+ },
+ error: -> (e) { out.error(e) },
+ complete: -> (e) { out.complete },
+ };
+ inobservable.subscribe(observer)
+ end
+ end
+
+ def subscribe(callbacks = {}, &nextcb)
+ callbacks[:next] = nextcb if block_given?
+ noop = -> () {}
+
+ klass = Class.new do
+ define_method(:next, callbacks.fetch(:next, noop))
+ define_method(:error, callbacks.fetch(:error, noop))
+ define_method(:complete, callbacks.fetch(:complete, noop))
+ end
+
+ observer = klass.new
+ @subscribe.call(observer)
+ end
+end
diff --git a/player/mruby/observable_test.rb b/player/mruby/observable_test.rb
new file mode 100644
index 0000000000..1a5cfcac6b
--- /dev/null
+++ b/player/mruby/observable_test.rb
@@ -0,0 +1,27 @@
+# require[_relative] doesn't like non .rb/.so/.a extensions in MRI
+load File.expand_path('./observable.mrb', __dir__)
+
+empty = Observable.create { |observer| observer.complete }
+value = -> (v) { Observable.create { |observer| observer.next(v) } }
+failure = Observable.create { |observer| observer.error("some error") }
+
+subscriptions = {
+ next: -> (x) { puts "next: '#{x}'" },
+ error: -> (x) { puts "error: '#{x}'" },
+ complete: -> () { puts "complete" }
+}
+
+empty.subscribe(subscriptions)
+value.("hello world").subscribe(subscriptions)
+value.("hello world").subscribe { |x| puts "block based subscription: '#{x}'" }
+value.(2).map { |x| x + 1 } .subscribe(subscriptions)
+failure.subscribe(subscriptions)
+failure # prints nothing, observables are lazy!
+
+# output will be:
+
+# complete
+# next: 'hello world'
+# block based sub hello world
+# next: '3'
+# error: 'some error'