Observables
Observables are like Ref
s:
julia> using Observables
julia> observable = Observable(0)
Observable{Int64} with 0 listeners. Value:
0
julia> observable[]
0
But unlike Ref
s, but you can listen for changes:
julia> obs_func = on(observable) do val
println("Got an update: ", val)
end
(::Observables.ObserverFunction) (generic function with 0 methods)
julia> observable[] = 42
Got an update: 42
42
To remove a handler use off
with the return value of on
:
julia> off(obs_func)
true
Weak Connections
If you use on
with weak = true
, the connection will be removed when the return value of on
is garbage collected. This can make it easier to clean up connections that are not used anymore.
obs_func = on(observable, weak = true) do val
println("Got an update: ", val)
end
# as long as obs_func is reachable the connection will stay
obs_func = nothing
# now garbage collection can at any time clear the connection
Async operations
Delay an update
julia> x = Observable(1)
Observable{Int64} with 0 listeners. Value:
1
julia> y = map(x) do val
@async begin
sleep(1.5)
return val + 1
end
end
Observable{Any} with 0 listeners. Value:
not assigned yet!
julia> tstart = time()
1.61399743663802e9
julia> onany(x, y) do xval, yval
println("At ", time()-tstart, ", we have x = ", xval, " and y = ", yval)
end
2-element Array{Observables.ObserverFunction,1}:
Observables.ObserverFunction(Observables.OnUpdate{Main.ex-manual.var"#7#8",Tuple{Observable{Int64},Observable{Any}}}(Main.ex-manual.var"#7#8"(), (Observable{Int64} with 2 listeners. Value:
1, Observable{Any} with 1 listeners. Value:
not assigned yet!)), Observable{Int64} with 2 listeners. Value:
1, false)
Observables.ObserverFunction(Observables.OnUpdate{Main.ex-manual.var"#7#8",Tuple{Observable{Int64},Observable{Any}}}(Main.ex-manual.var"#7#8"(), (Observable{Int64} with 2 listeners. Value:
1, Observable{Any} with 1 listeners. Value:
not assigned yet!)), Observable{Any} with 1 listeners. Value:
not assigned yet!, false)
julia> sleep(3)
At 1.4774608612060547, we have x = 1 and y = 2
julia> x[] = 5
At 3.6661758422851562, we have x = 5 and y = 2
5
julia> sleep(3)
At 5.169271945953369, we have x = 5 and y = 6
Multiply updates
If you want to fire several events on an update (e.g., for interpolating animations), you can use a channel:
julia> x = Observable(1)
Observable{Int64} with 0 listeners. Value:
1
julia> y = map(x) do val
Channel() do channel
for i in 1:10
put!(channel, i + val)
end
end
end; on(y) do val
println("updated to ", val)
end; sleep(2)
updated to 2
updated to 3
updated to 4
updated to 5
updated to 6
updated to 7
updated to 8
updated to 9
updated to 10
updated to 11
Similarly, you can construct the Observable from a Channel
:
Observable(Channel() do channel
for i in 1:10
put!(channel, i + 1)
end
end)
How is it different from Reactive.jl?
The main difference is Signal
s are manipulated mostly by converting one signal to another. For example, with signals, you can construct a changing UI by creating a Signal
of UI objects and rendering them as the signal changes. On the other hand, you can use an Observable both as an input and an output. You can arbitrarily attach outputs to inputs allowing structuring code in a signals-and-slots kind of pattern.
Another difference is Observables are synchronous, Signals are asynchronous. Observables may be better suited for an imperative style of programming.
API
Public
Observables.Observable
— Typeobs = Observable(val)
obs = Observable{T}(val)
Like a Ref
, but updates can be watched by adding a handler using on
or map
.
Observables.async_latest
— Methodasync_latest(observable::AbstractObservable, n=1)
Returns an Observable
which drops all but the last n
updates to observable
if processing the updates takes longer than the interval between updates.
This is useful if you want to pass the updates from, say, a slider to a plotting function that takes a while to compute. The plot will directly compute the last frame skipping the intermediate ones.
Example:
observable = Observable(0)
function compute_something(x)
for i=1:10^8 rand() end # simulate something expensive
println("updated with $x")
end
o_latest = async_latest(observable, 1)
on(compute_something, o_latest) # compute something on the latest update
for i=1:5
observable[] = i
end
Observables.connect!
— Methodconnect!(o1::AbstractObservable, o2::AbstractObservable)
Forwards all updates from o2
to o1
.
See also Observables.ObservablePair
.
Observables.observe_changes
— Methodobs = observe_changes(arg::AbstractObservable, eq=(==))
Returns an Observable
which updates with the value of arg
whenever the new value differs from the current value of obs
according to the equality operator eq
.
Example:
julia> obs = Observable(0);
julia> obs_change = observe_changes(obs);
julia> on(obs) do o
println("obs[] == $o")
end;
julia> on(obs_change) do o
println("obs_change[] == $o")
end;
julia> obs[] = 0;
obs[] == 0
julia> obs[] = 1;
obs_change[] == 1
obs[] == 1
julia> obs[] = 1;
obs[] == 1
Observables.obsid
— Methodobsid(observable::Observable)
Gets a unique id for an observable.
Observables.off
— Methodoff(observable::AbstractObservable, f)
Removes f
from listeners of observable
.
Returns true
if f
could be removed, otherwise false
.
Observables.off
— Methodoff(obsfunc::ObserverFunction)
Remove the listener function obsfunc.f
from the listeners of obsfunc.observable
. Once obsfunc
goes out of scope, this should allow obsfunc.f
and all the values it might have closed over to be garbage collected (unless there are other references to it).
Observables.on
— Methodon(f, observable::AbstractObservable; weak = false)
Adds function f
as listener to observable
. Whenever observable
's value is set via observable[] = val
, f
is called with val
.
Returns an ObserverFunction
that wraps f
and observable
and allows to disconnect easily by calling off(observerfunction)
instead of off(f, observable)
. If instead you want to compute a new Observable
from an old one, use map(f, ::Observable)
.
If weak = true
is set, the new connection will be removed as soon as the returned ObserverFunction
is not referenced anywhere and is garbage collected. This is useful if some parent object makes connections to outside observables and stores the resulting ObserverFunction
instances. Then, once that parent object is garbage collected, the weak observable connections are removed automatically.
Example
julia> obs = Observable(0)
Observable{Int64} with 0 listeners. Value:
0
julia> on(obs) do val
println("current value is ", val)
end
(::Observables.ObserverFunction) (generic function with 0 methods)
julia> obs[] = 5;
current value is 5
Observables.onany
— Methodonany(f, args...)
Calls f
on updates to any observable refs in args
. args
may contain any number of Observable
objects. f
will be passed the values contained in the refs as the respective argument. All other objects in args
are passed as-is.
See also: on
.
Observables.throttle
— Methodthrottle(dt, input::AbstractObservable)
Throttle a signal to update at most once every dt
seconds. The throttled signal holds the last update of the input
signal during each dt
second time window.
Extensions of Base methods or internal methods
Observables.ObserverFunction
— Typemutable struct ObserverFunction <: Function
Fields:
f::Function
observable::AbstractObservable
weak::Bool
ObserverFunction
is intended as the return value for on
because we can remove the created closure from obsfunc.observable
's listener vectors when ObserverFunction goes out of scope - as long as the weak
flag is set. If the weak
flag is not set, nothing happens when the ObserverFunction goes out of scope and it can be safely ignored. It can still be useful because it is easier to call off(obsfunc)
instead of off(observable, f)
to release the connection later.
Base.getindex
— Methodobservable[]
Returns the current value of observable
.
Base.map!
— Methodmap!(f, observable::AbstractObservable, args...; update::Bool=true)
Updates observable
with the result of calling f
with values extracted from args. args
may contain any number of Observable
objects. f
will be passed the values contained in the refs as the respective argument. All other objects in args
are passed as-is.
By default observable
gets updated immediately, but this can be suppressed by specifying update=false
.
Example
We'll create an observable that can hold an arbitrary number:
julia> obs = Observable{Number}(3)
Observable{Number} with 0 listeners. Value:
3
Now,
julia> obsrt1 = map(sqrt, obs)
Observable{Float64} with 0 listeners. Value:
1.7320508075688772
creates an Observable{Float64}
, which will fail to update if we set obs[] = 3+4im
. However,
julia> obsrt2 = map!(sqrt, Observable{Number}(), obs)
Observable{Number} with 0 listeners. Value:
1.7320508075688772
can handle any number type for which sqrt
is defined.
Base.map
— Methodobs = map(f, arg1::AbstractObservable, args...)
Creates a new observable ref obs
which contains the result of f
applied to values extracted from arg1
and args
(i.e., f(arg1[], ...)
. arg1
must be an observable ref for dispatch reasons. args
may contain any number of Observable
objects. f
will be passed the values contained in the refs as the respective argument. All other objects in args
are passed as-is.
If you don't need the value of obs
, and just want to run f
whenever the arguments update, use on
or onany
instead.
Example
julia> obs = Observable([1,2,3]);
julia> map(length, obs)
Observable{Int64} with 0 listeners. Value:
3
Base.notify
— Methodnotify(observable::AbstractObservable)
Update all listeners of observable
.
Base.setindex!
— Methodobservable[] = val
Updates the value of an Observable
to val
and call its listeners.
Observables.to_value
— Methodto_value(x::Union{Any, AbstractObservable})
Extracts the value of an observable, and returns the object if it's not an observable!
Observables.@map!
— Macro@map!(d, expr)
Wrap AbstractObservables
in &
to compute expression expr
using their value: the expression will be computed every time the AbstractObservables
are updated and d
will be set to match that value.
Examples
julia> a = Observable(2);
julia> b = Observable(3);
julia> c = Observable(10);
julia> Observables.@map! c &a + &b;
julia> c[]
10
julia> a[] = 100
100
julia> c[]
103
Observables.@map
— Macro@map(expr)
Wrap AbstractObservables
in &
to compute expression expr
using their value. The expression will be computed when @map
is called and every time the AbstractObservables
are updated.
Examples
julia> a = Observable(2);
julia> b = Observable(3);
julia> c = Observables.@map &a + &b;
julia> c[]
5
julia> a[] = 100
100
julia> c[]
103
Observables.@on
— Macro@on(expr)
Wrap AbstractObservables
in &
to execute expression expr
using their value. The expression will be computed every time the AbstractObservables
are updated.
Examples
julia> a = Observable(2);
julia> b = Observable(3);
julia> Observables.@on println("The sum of a+b is $(&a + &b)");
julia> a[] = 100;
The sum of a+b is 103