Observables

Observables are like Refs:

julia> using Observables

julia> observable = Observable(0)
Observable{Int64} with 0 listeners. Value:
0

julia> observable[]
0

But unlike Refs, but you can listen for changes:

julia> obs_func = on(observable) do val
           println("Got an update: ", val)
       end
(::Observables.ObserverFunction) (generic function with 0 methods)

julia> observable[] = 42
Got an update: 42
42

To remove a handler use off with the return value of on:

julia> off(obs_func)
true

Weak Connections

If you use on with weak = true, the connection will be removed when the return value of on is garbage collected. This can make it easier to clean up connections that are not used anymore.

obs_func = on(observable, weak = true) do val
    println("Got an update: ", val)
end
# as long as obs_func is reachable the connection will stay

obs_func = nothing
# now garbage collection can at any time clear the connection

Async operations

Delay an update

julia> x = Observable(1)
Observable{Int64} with 0 listeners. Value:
1

julia> y = map(x) do val
           @async begin
               sleep(1.5)
               return val + 1
           end
       end
Observable{Any} with 0 listeners. Value:
not assigned yet!

julia> tstart = time()
1.61399743663802e9

julia> onany(x, y) do xval, yval
           println("At ", time()-tstart, ", we have x = ", xval, " and y = ", yval)
       end
2-element Array{Observables.ObserverFunction,1}:
 Observables.ObserverFunction(Observables.OnUpdate{Main.ex-manual.var"#7#8",Tuple{Observable{Int64},Observable{Any}}}(Main.ex-manual.var"#7#8"(), (Observable{Int64} with 2 listeners. Value:
1, Observable{Any} with 1 listeners. Value:
not assigned yet!)), Observable{Int64} with 2 listeners. Value:
1, false)
 Observables.ObserverFunction(Observables.OnUpdate{Main.ex-manual.var"#7#8",Tuple{Observable{Int64},Observable{Any}}}(Main.ex-manual.var"#7#8"(), (Observable{Int64} with 2 listeners. Value:
1, Observable{Any} with 1 listeners. Value:
not assigned yet!)), Observable{Any} with 1 listeners. Value:
not assigned yet!, false)

julia> sleep(3)
At 1.4774608612060547, we have x = 1 and y = 2

julia> x[] = 5
At 3.6661758422851562, we have x = 5 and y = 2
5

julia> sleep(3)
At 5.169271945953369, we have x = 5 and y = 6

Multiply updates

If you want to fire several events on an update (e.g., for interpolating animations), you can use a channel:

julia> x = Observable(1)
Observable{Int64} with 0 listeners. Value:
1

julia> y = map(x) do val
           Channel() do channel
               for i in 1:10
                   put!(channel, i + val)
               end
           end
       end; on(y) do val
           println("updated to ", val)
       end; sleep(2)
updated to 2
updated to 3
updated to 4
updated to 5
updated to 6
updated to 7
updated to 8
updated to 9
updated to 10
updated to 11

Similarly, you can construct the Observable from a Channel:

Observable(Channel() do channel
    for i in 1:10
        put!(channel, i + 1)
    end
end)

How is it different from Reactive.jl?

The main difference is Signals are manipulated mostly by converting one signal to another. For example, with signals, you can construct a changing UI by creating a Signal of UI objects and rendering them as the signal changes. On the other hand, you can use an Observable both as an input and an output. You can arbitrarily attach outputs to inputs allowing structuring code in a signals-and-slots kind of pattern.

Another difference is Observables are synchronous, Signals are asynchronous. Observables may be better suited for an imperative style of programming.

API

Public

Observables.async_latestMethod
async_latest(observable::AbstractObservable, n=1)

Returns an Observable which drops all but the last n updates to observable if processing the updates takes longer than the interval between updates.

This is useful if you want to pass the updates from, say, a slider to a plotting function that takes a while to compute. The plot will directly compute the last frame skipping the intermediate ones.

Example:

observable = Observable(0)
function compute_something(x)
    for i=1:10^8 rand() end # simulate something expensive
    println("updated with $x")
end
o_latest = async_latest(observable, 1)
on(compute_something, o_latest) # compute something on the latest update

for i=1:5
    observable[] = i
end
source
Observables.observe_changesMethod
obs = observe_changes(arg::AbstractObservable, eq=(==))

Returns an Observable which updates with the value of arg whenever the new value differs from the current value of obs according to the equality operator eq.

Example:

julia> obs = Observable(0);

julia> obs_change = observe_changes(obs);

julia> on(obs) do o
           println("obs[] == $o")
       end;

julia> on(obs_change) do o
           println("obs_change[] == $o")
       end;

julia> obs[] = 0;
obs[] == 0

julia> obs[] = 1;
obs_change[] == 1
obs[] == 1

julia> obs[] = 1;
obs[] == 1
source
Observables.offMethod
off(observable::AbstractObservable, f)

Removes f from listeners of observable.

Returns true if f could be removed, otherwise false.

source
Observables.offMethod
off(obsfunc::ObserverFunction)

Remove the listener function obsfunc.f from the listeners of obsfunc.observable. Once obsfunc goes out of scope, this should allow obsfunc.f and all the values it might have closed over to be garbage collected (unless there are other references to it).

source
Observables.onMethod
on(f, observable::AbstractObservable; weak = false)

Adds function f as listener to observable. Whenever observable's value is set via observable[] = val, f is called with val.

Returns an ObserverFunction that wraps f and observable and allows to disconnect easily by calling off(observerfunction) instead of off(f, observable). If instead you want to compute a new Observable from an old one, use map(f, ::Observable).

If weak = true is set, the new connection will be removed as soon as the returned ObserverFunction is not referenced anywhere and is garbage collected. This is useful if some parent object makes connections to outside observables and stores the resulting ObserverFunction instances. Then, once that parent object is garbage collected, the weak observable connections are removed automatically.

Example

julia> obs = Observable(0)
Observable{Int64} with 0 listeners. Value:
0

julia> on(obs) do val
           println("current value is ", val)
       end
(::Observables.ObserverFunction) (generic function with 0 methods)

julia> obs[] = 5;
current value is 5
source
Observables.onanyMethod
onany(f, args...)

Calls f on updates to any observable refs in args. args may contain any number of Observable objects. f will be passed the values contained in the refs as the respective argument. All other objects in args are passed as-is.

See also: on.

source
Observables.throttleMethod
throttle(dt, input::AbstractObservable)

Throttle a signal to update at most once every dt seconds. The throttled signal holds the last update of the input signal during each dt second time window.

source

Extensions of Base methods or internal methods

Observables.ObserverFunctionType
mutable struct ObserverFunction <: Function

Fields:

f::Function
observable::AbstractObservable
weak::Bool

ObserverFunction is intended as the return value for on because we can remove the created closure from obsfunc.observable's listener vectors when ObserverFunction goes out of scope - as long as the weak flag is set. If the weak flag is not set, nothing happens when the ObserverFunction goes out of scope and it can be safely ignored. It can still be useful because it is easier to call off(obsfunc) instead of off(observable, f) to release the connection later.

source
Base.map!Method
map!(f, observable::AbstractObservable, args...; update::Bool=true)

Updates observable with the result of calling f with values extracted from args. args may contain any number of Observable objects. f will be passed the values contained in the refs as the respective argument. All other objects in args are passed as-is.

By default observable gets updated immediately, but this can be suppressed by specifying update=false.

Example

We'll create an observable that can hold an arbitrary number:

julia> obs = Observable{Number}(3)
Observable{Number} with 0 listeners. Value:
3

Now,

julia> obsrt1 = map(sqrt, obs)
Observable{Float64} with 0 listeners. Value:
1.7320508075688772

creates an Observable{Float64}, which will fail to update if we set obs[] = 3+4im. However,

julia> obsrt2 = map!(sqrt, Observable{Number}(), obs)
Observable{Number} with 0 listeners. Value:
1.7320508075688772

can handle any number type for which sqrt is defined.

source
Base.mapMethod
obs = map(f, arg1::AbstractObservable, args...)

Creates a new observable ref obs which contains the result of f applied to values extracted from arg1 and args (i.e., f(arg1[], ...). arg1 must be an observable ref for dispatch reasons. args may contain any number of Observable objects. f will be passed the values contained in the refs as the respective argument. All other objects in args are passed as-is.

If you don't need the value of obs, and just want to run f whenever the arguments update, use on or onany instead.

Example

julia> obs = Observable([1,2,3]);

julia> map(length, obs)
Observable{Int64} with 0 listeners. Value:
3
source
Base.notifyMethod
notify(observable::AbstractObservable)

Update all listeners of observable.

source
Base.setindex!Method
observable[] = val

Updates the value of an Observable to val and call its listeners.

source
Observables.to_valueMethod
to_value(x::Union{Any, AbstractObservable})

Extracts the value of an observable, and returns the object if it's not an observable!

source
Observables.@map!Macro
@map!(d, expr)

Wrap AbstractObservables in & to compute expression expr using their value: the expression will be computed every time the AbstractObservables are updated and d will be set to match that value.

Examples

julia> a = Observable(2);

julia> b = Observable(3);

julia> c = Observable(10);

julia> Observables.@map! c &a + &b;

julia> c[]
10

julia> a[] = 100
100

julia> c[]
103
source
Observables.@mapMacro
@map(expr)

Wrap AbstractObservables in & to compute expression expr using their value. The expression will be computed when @map is called and every time the AbstractObservables are updated.

Examples

julia> a = Observable(2);

julia> b = Observable(3);

julia> c = Observables.@map &a + &b;

julia> c[]
5

julia> a[] = 100
100

julia> c[]
103
source
Observables.@onMacro
@on(expr)

Wrap AbstractObservables in & to execute expression expr using their value. The expression will be computed every time the AbstractObservables are updated.

Examples

julia> a = Observable(2);

julia> b = Observable(3);

julia> Observables.@on println("The sum of a+b is $(&a + &b)");

julia> a[] = 100;
The sum of a+b is 103
source