:
Last modified: May 11, 2023

Application construction components - Altinn Platform Events

The Events component in Altinn platform is constructed as an asp.net core web API application deployed as a docker container to a Kubernetes cluster.

The Events components expose public REST APIs for publishing, retrieving and subscribing to events.

Azure Functions are used internally to process and forward incoming cloud events to subscriber webhooks.

The following diagram illustrates the overall data flow.

Event architecture diagram
Altinn Event Architecture

When a publish request is posted to the /app endpoint, the event will first be saved in the events-registration queue for operational resilience and flexibility.

When an event retrieval request is received, it will respond with results from the internal relational database used for events persistence.

Public API

The following API controllers are defined

Internal API

The following endpoints are intended for internal use only.

Processing events

processing a single incoming event

Sequence diagram - POST event
Sequence diagram - POST event

Controller details

Base API url: /events/api/v1

AppController

The AppController in the Events component is the one receiving events from Apps and other sources.

It verifies that the app is authorized to publish events for the given source before saving the event in a database for at least 90 days.

It also exposes API to search for events and to get events.

The access is controlled by the XACML Policy for the given App that is the source for an given event.

The AuthorizationHelper is responsible for creating and performing the request to the Policy Decision Point.

SubscriptionController

The SubscriptionController exposes API to

  • Add subscriptions
  • Delete subscriptions
  • Get subscriptions
  • Validate subscriptions

InboundController

Internal API

Thin client to events-inbound queue storage, minimal processing logic.

OutboundController (previously PushController)

OutboundController is called by the EventsInbound function.

Based on details from the Event it will identify matching subscriptions.

For each match it will authorize the subscriber using the Policy Authorization Point.

The AuthorizationHelper is responsible for creating and performing the request to the Policy Decision Point.

The access is controlled by the XACML Policy for the given App that is the source for an given event.

If the subscriber is Authorized, the event will be added to the “events-outbound” queue and picked up by the EventsOutbound function. (see below)

Event storage

To be able to get the search capability needed for the Events component we have choosen to use PostgreSQL.

Using PostgreSQL makes is possible to sort the events based on a primary key and also makes it possible to search over all events as a serialized json document based on properties such as subject or source.

The table structure

CREATE TABLE IF NOT EXISTS events.events
(
    sequenceno bigint NOT NULL DEFAULT nextval('events.events_sequenceno_seq1'::regclass),
    cloudevent jsonb NOT NULL,
    registeredtime timestamp with time zone DEFAULT (now() AT TIME ZONE 'utc'::text),
    CONSTRAINT events_pkey PRIMARY KEY (sequenceno)
)
CREATE TABLE IF NOT EXISTS events.subscription
(
    id bigint NOT NULL DEFAULT nextval('events.subscription_id_seq'::regclass),
    sourcefilter character varying COLLATE pg_catalog."default",
    subjectfilter character varying COLLATE pg_catalog."default",
    typefilter character varying COLLATE pg_catalog."default",
    consumer character varying COLLATE pg_catalog."default" NOT NULL,
    endpointurl character varying COLLATE pg_catalog."default" NOT NULL,
    createdby character varying COLLATE pg_catalog."default" NOT NULL,
    validated boolean NOT NULL,
    "time" timestamp with time zone NOT NULL,
    sourcefilterhash character varying(32) COLLATE pg_catalog."default",
    CONSTRAINT eventssubscription_pkey PRIMARY KEY (id)
)

Functions and procedurees are used to add, delete and query data from the above tables. See all functions and stored procedures here.

Event sequencing

Events will be sequenced by the field sequenceno, which is also the primary key of the Events table.

Indexing

The events table has indexes on the columns cloudevent (gin index) and registeredtime (btree index).

Functions

As part of the Events Component there is a single Azure Function App with four functions involved in processing cloud events at various stages (Azure Queue Storage). Click on name for code.

  • EventsRegistration
    • dequeue from events-registration
    • store to database
    • forward to events-inbound queue
  • EventsInbound
    • dequeue from events-inbound
    • find valid subscriptions with filters that match event
    • forward a copy of event to events-outbound queue for each matching subscription
  • EventsOutbound
    • dequeue from events-outbound
    • POST cloud event to target webhook endpoint, retry as necessary
  • SubscriptionValidation
    • check that the user-defined webhook endpoint is ready to receive data

EventsRegistration

The EventsRegistration function is executed automatically by the Azure Function runtime when new events are posted to the events-registration queue.

It first sends the event to storage for persistence, once sucessfully persisten, the event is forwarded to the inbound controller. The Function uses Platform Access token to authenticate itself for the Inbound and Storage controllers.

It uses standard mechanismen for retry, if the call for controllers fails.

EventsInbound

The EventsInbound function is executed automatically by the Azure Function runtime when new events are posted to the events-inbound queue.

It forwards the event to the PushController through the pushEventService.

The Function uses Platform Access token to authenticate itself for the PushController

It uses standard mechanismen for retry, if the call for pushcontroller fails.

EventsOutbound

The EventsOutbound function is triggered by QueueStorage changes in the “events-outbound” queue.

It will try to push the event to given subscription endpoint given in the CloudEventEnvelope that is put on the queue and containing the event.

This function is configured with CustomQueueProcessorFactory to handle retry if it is not possible to push event to the endpoint.

It will try send the event right away, but if the request to webhook fails (Http status != 200) it will put the cloudevent back on the queue with a defined wait time.

  1. retries after 10 seconds
  2. retries after 30 seconds
  3. retries after 1 minute
  4. retries after 5 minutes
  5. retries after 10 minutes
  6. retries after 30 minutes
  7. retries after 1 hour
  8. retries after 3 hours
  9. retries after 6 hours
  10. retries after 12 hours
  11. retries after 12 hours

If it fails the 12. time it will put the event in the dead letter queue and will not try again.

SubscriptionValidation

The SubscriptionValidation function is triggered byQueueStorage changes in the “subscription-validation” queue.

It will try to validate the endpoing given in the Subscription that is put on the queue.

This function is configured with CustomQueueProcessorFactory to handle retry if it is not possible to push event to the endpoint.

It will try send the event right away, but if the request to webhook fails (Http status != 200) it will put the cloudevent back on the queue with a defined wait time.

  1. retries after 10 seconds
  2. retries after 30 seconds
  3. retries after 1 minute
  4. retries after 5 minutes
  5. retries after 10 minutes
  6. retries after 30 minutes
  7. retries after 1 hour
  8. retries after 3 hours
  9. retries after 6 hours
  10. retries after 12 hours
  11. retries after 12 hours

If it fails the 12. time it will put the event in the dead letter queue and will not try again.

If endpoint responds with 200OK it will then set the subscription status to valid with calling the validate endpoint in the Subscription API.