Starlark
Intro#
The event-starlark
processor applies a Starlark
function on a list of event
messages before returning them to the processors pipeline and then to the output.
starlark
is a dialect of Python, developed initially for the Bazel build tool but found multiple uses as a configuration language embedded in a larger application.
There are a few differences between Python and Starlark, programs written in Starlark are supposed to be short-lived and have no external side effects, their main result is structured data or side effects on the host application. As a result, Starlark has no need for classes, exceptions, reflection, concurrency, and other such features of Python.
gNMIc
uses the Go implementation of Starlark.
A Starlark program running as a gNMIc
processor should define an apply
function that takes an arbitrary number of arguments of type Event
and returns zero or more Event
s.
An Event
is the transformed gNMI update message as gNMIc
processes it.
def apply(*events)
# events transformed/augmented/filtered here
return events
Configuration#
processors:
# processor name
sample-processor:
# processor type
event-starlark:
# the source of the starlark program.
source: |
def apply(*events):
# processor logic here
return events
# path to a file containing the starlark program to run.
# Mutually exclusive with `source` parameter.
script:
# boolean enabling extra logging
debug: false
Writing a Starlark processor#
To write a starlark processor all that is needed is writing a function called apply
that will read/modify/delete a list of Event
messages.
Starlark specification defines multiple builtin types and functions.
gNMIc
provides additional builtin functions like Event(name)
which creates a new Event
message and copy_event(Event)
which duplicates a given Event
message.
The Event
message comprises a few fields:
-
name
: string -
timestamp
: int64 -
tags
: dictionary of string to string -
values
: dictionary of string to any -
deletes
: list of strings
Starlark allows for the dynamic loading of other modules. In the context of gNMIc, the following two modules are available for loading within a starlark program:
-
time:
load("time.star", "time")
loads the time library which provides the following functions to work with theEvent
message timestamp field:-
time.from_timestamp(sec, nsec)
:Converts the given Unix time corresponding to the number of seconds and (optionally) nanoseconds since January 1, 1970 UTC into an object of type Time.
For more details, refer to https://pkg.go.dev/time#Unix.
-
time.is_valid_timezone(loc)
:Reports whether loc is a valid time zone name.
-
time.now()
:Returns the current local time.
-
time.parse_duration(d)
:Parses the given duration string.
For more details, refer to https://pkg.go.dev/time#ParseDuration.
-
time.parse_time(x, format, location)
:Parses the given time string using a specific time format and location. The expected arguments are a time string (mandatory), a time format (optional, set to RFC3339 by default, e.g. "2021-03-22T23:20:50.52Z") and a name of location (optional, set to UTC by default).
For more details, refer to https://pkg.go.dev/time#Parse and https://pkg.go.dev/time#ParseInLocation.
-
time.time(year, month, day, hour, minute, second, nanosecond, location)
:Returns the Time corresponding to
yyyy-mm-dd hh:mm:ss + nsec nanoseconds
in the appropriate zone for that time in the given location. All the parameters are optional.
-
-
math:
load("math.star", "math")
loads the math library which provides a set of constants and math-related functions:-
ceil(x)
:Returns the ceiling of x, the smallest integer greater than or equal to x.
-
copysign(x, y)
:Returns a value with the magnitude of x and the sign of y.
-
fabs(x)
:Returns the absolute value of x as float.
-
floor(x)
:Returns the floor of x, the largest integer less than or equal to x.
-
mod(x, y)
:Returns the floating-point remainder of x/y. The magnitude of the result is less than y and its sign agrees with that of x.
-
pow(x, y)
:Returns x**y, the base-x exponential of y.
-
remainder(x, y)
:Returns the IEEE 754 floating-point remainder of x/y.
-
round(x)
:Returns the nearest integer, rounding half away from zero.
-
exp(x)
:Returns e raised to the power x, where e = 2.718281… is the base of natural logarithms.
-
sqrt(x)
:Returns the square root of x.
-
acos(x)
:Returns the arc cosine of x, in radians.
-
asin(x)
:Returns the arc sine of x, in radians.
-
atan(x)
:Returns the arc tangent of x, in radians.
-
atan2(y, x)
:Returns atan(y / x), in radians. The result is between -pi and pi. The vector in the plane from the origin to point (x, y) makes this angle with the positive X axis. The point of atan2() is that the signs of both inputs are known to it, so it can compute the correct quadrant for the angle. For example, atan(1) and atan2(1, 1) are both pi/4, but atan2(-1, -1) is -3*pi/4.
-
cos(x)
:Returns the cosine of x, in radians.
-
hypot(x, y)
:Returns the Euclidean norm, sqrt(x*x + y*y). This is the length of the vector from the origin to point (x, y).
-
sin(x)
:Returns the sine of x, in radians.
-
tan(x)
:Returns the tangent of x, in radians.
-
degrees(x)
:Converts angle x from radians to degrees.
-
radians(x)
:Converts angle x from degrees to radians.
-
acosh(x)
:Returns the inverse hyperbolic cosine of x.
-
asinh(x)
:Returns the inverse hyperbolic sine of x.
-
atanh(x)
:Returns the inverse hyperbolic tangent of x.
-
cosh(x)
:Returns the hyperbolic cosine of x.
-
sinh(x)
:Returns the hyperbolic sine of x.
-
tanh(x)
:Returns the hyperbolic tangent of x.
-
log(x, base)
:Returns the logarithm of x in the given base, or natural logarithm by default.
-
gamma(x)
:Returns the Gamma function of x.
-
Examples#
Move a value to a tag#
def apply(*events):
dels = []
for e in events:
for k, v in e.values.items():
if k == "val1":
e.tags[k] = str(v)
dels.append(k)
for d in dels:
e.values.pop(d)
return events
Rename values#
val_map = {
"val1": "new_val",
}
def apply(*events):
for e in events:
for k, v in e.values.items():
if k in val_map:
e.values[val_map[k]] = v
e.values.pop(k)
return events
Convert strings to integers#
def apply(*events):
for e in events:
for k, v in e.values.items():
if v.isdigit():
e.values[k] = int(v)
return events
Set an interface description as a tag#
This script stores each interface description per target/interface in a cache and adds it to other values as a tag.
cache = {}
def apply(*events):
evs = []
# check if on the event messages contains an interface description
# and store in th cache dict
for e in events:
if e.values.get("/interface/description"):
target_if = e.tags["source"] + "_" + e.tags["interface_name"]
cache[target_if] = e.values["/interface/description"]
# for each event get the 'source' and 'interface_name', check
# if a corresponding cache entry exists and set it as a
# 'description' tag
for e in events:
if e.tags.get("source") and e.tags.get("interface_name"):
target_if = e.tags["source"] + "_" + e.tags["interface_name"]
if cache.get(target_if):
e.tags["description"] = cache[target_if]
evs.append(e)
return evs
Calculate new values based on the received ones#
The below script calculates the avg, min, max of a list of values over their last N=10 values
cache = {}
values_names = [
'/interface/statistics/out-octets',
'/interface/statistics/in-octets'
]
N=10
def apply(*events):
for e in events:
for value_name in values_names:
v = e.values.get(value_name)
# check if v is not None and is a digit to proceed
if not v.isdigit():
continue
# update cache with the latest value
val_key = "_".join([e.tags["source"], e.tags["interface_name"], value_name])
if not cache.get(val_key):
# initialize the cache entry if empty
cache.update({val_key: []})
if len(cache[val_key]) >= N:
# remove the oldest entry if the number of entries reached N
cache[val_key] = cache[val_key][1:]
# update cache entry
cache[val_key].append(int(v))
# get the list of values
val_list = cache[val_key]
# calculate min, max and avg
e.values[value_name+"_min"] = min(val_list)
e.values[value_name+"_max"] = max(val_list)
e.values[value_name+"_avg"] = avg(val_list)
return events
def avg(vals):
sum = 0
for v in vals:
sum = sum + v
return sum/len(vals)
The below script builds on top of the previous one by adding the rate calculation to the added values. Now the cache contains a timestamp as well as the value.
cache = {}
values_names=[
'/interface/statistics/out-octets',
'/interface/statistics/in-octets'
]
N=10
def apply(*events):
for e in events:
for value_name in values_names:
v = e.values.get(value_name)
# check if v is not None and is a digit to proceed
if not v.isdigit():
continue
# update cache with the latest value
val_key = "_".join([e.tags["source"], e.tags["interface_name"], value_name])
if not cache.get(val_key):
# initialize the cache entry if empty
cache.update({val_key: []})
if len(cache[val_key]) >= N:
# remove the oldest entry if the number of entries reached N
cache[val_key] = cache[val_key][1:]
# update cache entry
cache[val_key].append((e.timestamp, int(v)))
# get the list of values
val_list = cache[val_key]
# calculate min, max and avg
vals = [x[1] for x in val_list]
e.values[value_name+"_min"] = min(vals)
e.values[value_name+"_max"] = max(vals)
e.values[value_name+"_avg"] = avg(vals)
if len(val_list) > 1:
e.values[value_name+"_rate"] = rate(val_list[-2:])
return events
def avg(vals):
sum = 0
for v in vals:
sum = sum + v
return sum/len(vals)
def rate(vals):
period = (vals[1][0] - vals[0][0]) / 1000000000
change = vals[1][1] - vals[0][1]
return change / period
Ungroup values#
The below script ungroups values part of the same event message producing an event message per value.
def apply(*events):
ungrouped_events = []
for e in events:
for k, v in e.values.items():
# create a new event without any value
new_event = Event(e.name, e.timestamp, e.tags)
# add a single value to the new event
new_event.values[k] = v
# add the new event to the array
ungrouped_events.append(new_event)
return ungrouped_events