OpenTelemetry is an observability framework for cloud-native software to instrument, generate, collect, and export telemetry data (such as metrics, logs, and traces) for analysis. OpenTelemetry provides language-specific integrations automatically that capture relevant traces, metrics, and handle context propagation.

For Node.js, OpenTelemetry currently provides 13 plugins, which provide automatic tracing capabilities for gRPC (1.x), HTTP, HTTPS, MongoDB (2.x and 3.x), MySQL (2.x), PostgreSQL (pg 7.x/8.x and pg-pool 2.x/3.x), Redis (redis 2.x and ioredis 2.x/3.x/4.x), DNS, Express (4.x), Hapi, and Koa (2.x). However, if you use a different plugin (or non-supported version of a plugin) to access your database, or use a non-supported framework (Fastify, for example), then those standard plugins obviously will not work. You will either have to instrument your code manually, or write an instrumentation plugin. In this article we are going to show how to develop an instrumentation plugin for OpenTelemetry using better-sqlite3 as an example.

Note: the full source code of the example plugin is available in this GitHub repository.

We assume that you are familiar with the basic concepts of distributed tracing. In particular, you need to understand what trace and span are:

A trace is a collection of spans, which are objects that represent the work being done by individual services or components involved in a request as it flows through a system. A span contains a span context, which is a set of globally unique identifiers that represent the unique request that each span is a part of. Your system may handle dozens, hundreds, thousands, or millions of requests per second – each of these requests would have a single trace identifier, and each span in that request would have a unique span identifier. This span context is immutable and cannot be modified after creation.

A trace contains a single root span which encapsulates the end-to-end latency for the entire request. You can think of this as a single logical operation, such as clicking a button in a web application to add a product to a shopping cart. The root span would measure the time it took from an end-user clicking that button to the operation being completed or failing (so, the item is added to the cart or some error occurs) and the result being displayed to the user. A trace is comprised of the single root span and any number of child spans, which represent operations taking place as part of the request. Each span contains metadata about the operation, such as its name, start and end timestamps, attributes, events, and status.

The purpose of an instrumentation plugin is to augment the functionality of a package by automatically creating spans in some key points, and populating them with additional metadata. What a key point is, depends on a package. For example, for a database driver this can be a function that sends a query to the database: the span would then store information about the query (the query itself, query duration, database related information). For a web framework a span could correspond to a middleware handling the request.

Instrumentation Internals

Internally, OpenTelemetry relies upon two packages to instrument other packages. These are require-in-the-middle and shimmer.

require-in-the-middle hooks into the require function, which allows for modifying modules on-the-fly as they are being required. It is used by the core (specifically, by the plugin loader) to activate the plugin when the module it instruments is loaded. There is usually no need to use require-in-the-middle in your plugin.

shimmer provides methods to monkey-patch other modules. It is used by virtually all plugins: when the module to instrument is loaded, the plugin monkey-patches its functions to add instrumentation capabilities. When the plugin is disabled, it needs to undo its patches.

Instrumentation Plugins

Every instrumentation plugin implements the Plugin interface, and usually inherits from the BasePlugin class. Plugins must implement two methods: patch() and unpatch(), and initialize one required property, moduleName.

The very basic plugin will look like this:

import { BasePlugin } from '@opentelemetry/core';
import shimmer from 'shimmer';
import type moduleTypes from 'module-we-instrument';

export class MyInstrumentationPlugin extends BasePlugin<moduleTypes.InterfaceDescribingModuleExports> {
    public readonly supportedVersions = ['^7.0.0'];
    public static readonly COMPONENT = 'module-we-instrument';

    private enabled = false;

    public constructor(public readonly moduleName: string) {
        super('name-of-our-plugin');
    }

    protected patch(): moduleTypes.InterfaceDescribingModuleExports {
        if (!this.enabled) {
            this.enabled = true;

            // Monkey-patch the module as necessary
            // shimmer.wrap(this._moduleExports, 'some-function', (original) => original);
        }

        return this._moduleExports;
    }

    protected unpatch(): void {
        if (this.enabled) {
            this.enabled = false;

            // Undo our patches
            // shimmer.massUnwrap([this._moduleExports.prototype], ['some-function', 'some-other-function']);
        }
    }
}

export const plugin = new MyInstrumentationPlugin(MyInstrumentationPlugin.COMPONENT);

Let us see what is going on in this code.

In line 3 we import all types from the module we are going to instrument. Please note the use of import types construct: we are not allowed to import the module itself. This is because of the way require works: require-in-the-middle must first be configured to intercept a call to require('module-we-instrument') (this is what a usual import statement is transpiled into), and this only happens after the plugin loader loads our plugin. If we require() the module in our plugin, this will break instrumentation.

Line 6 declares the version of the module our instrumentation plugin supports. Versions are given as an array of the supported version in a semver format. The enable() method will be invoked only for compatible versions of the module. supportedVersions property is optional: if you don’t specify it, OpenTelemetry assumes that all versions of the module are supported.

Line 7 defines the COMPONENT property. It is not required, but by convention it contains the name of the module the plugin instruments.

Lines 11 to 13 define the plugin constructor: it initializes the moduleName property, and then calls the parent constructor passing the name of the plugin as the name of the tracer (you can pass any string).

Line 15 defines the patch() method. It is invoked when the instrumented module is loaded (or when the plugin gets enabled). Its function is to monkey-patch module functions to add automatic instrumentation. unpatch(), defined on line 26, undoes what patch() does.

Because there is nothing that stops the user from enabling the enabled and disabling the disabled plugin, we add a check to make sure that we are not monkey-patching the already patched module (and the same for unpatch).

This plugin does nothing, in the real plugin we would call shimmer.wrap() and provide our implementations augmenting functions of the module. Let us look at the real example.

Instrumenting better-sqlite3

better-sqlite3 provides a simple and easy to use interface to SQLite 3 from Node.js. Its API is pretty simple (and this is the reason why we chose it as an example).

First, we need to decide what “key points” are, what we are going to instrument. This is a database driver, therefore it will be wise to measure how long queries take. Thus we need to intercept all methods that send queries to the driver.

At a glance, these are:

Then, if we look at the code, we will see that we can exclude pragma from the list: internally, it runs Database#prepare() and then Statement#get() or Statement#all().

Unfortunately, we cannot do the same for Statement methods: run(), get(), all() use sqlite3_step(), which evaluates the prepared statement.

Let us start from Database. It is default-exported as a function, therefore instead of monkey-patching module exports, we will need to patch the function’s prototype.

Let’s go!

import { BasePlugin } from '@opentelemetry/core';
import type bs3Types from 'better-sqlite3';
import shimmer from 'shimmer';

export class BetterSqlite3Plugin extends BasePlugin<typeof bs3Types> {
    public readonly supportedVersions = ['^7.0.0'];
    public static readonly COMPONENT = 'better-sqlite3';

    private enabled = false;

    public constructor(public readonly moduleName: string) {
        super('opentelemetry-plugin-better-sqlite3');
    }

    protected patch(): typeof bs3Types {
        if (!this.enabled) {
            this.enabled = true;

            // TODO
        }

        return this._moduleExports;
    }

    protected unpatch(): void {
        if (this.enabled) {
            this.enabled = false;

            // TODO
        }
    }
}

export const plugin = new BetterSqlite3Plugin(BetterSqlite3Plugin.COMPONENT);

Let us patch the exec() method:

// patch():
const proto = this._moduleExports.prototype;
shimmer.wrap(proto, 'exec', this.patchExec);

// unpatch():
const proto = this._moduleExports.prototype;
shimmer.unwrap(proto, 'exec');

The patchExec property will look like this:

    private readonly patchExec = (original: (source: string) => bs3Types.Database): typeof original => {
        const self = this;
        return function (this: bs3Types.Database, ...params): ReturnType<typeof original> {
            const statement = params[0].trim().split(/\s/u)[0];
            const span = self._tracer.startSpan(statement, {
                kind: SpanKind.CLIENT,
                attributes: {
                    [DatabaseAttribute.DB_SYSTEM]: 'sqlite3',
                    [DatabaseAttribute.DB_STATEMENT]: params[0],
                    [DatabaseAttribute.DB_NAME]: this.name,
                },
            });

            return self._tracer.withSpan(span, () => {
                try {
                    const result = original.apply(this, params);
                    span.setStatus({ code: CanonicalCode.OK }).end();
                    return result;
                } catch (e) {
                    span.setStatus({ code: CanonicalCode.UNKNOWN, message: (e as Error).message }).end();
                    throw e;
                }
            });
        };
    };

The code defines a function, which, given the original function, returns a patched one — this is what shimmer expects. The patched function (line 3) is defined in a safe way: in JavaScript, you can pass more arguments to the function than its signature says. Therefore we are using “rest parameters” to accept and forward all arguments the caller may have passed. On lines 4 to 12, we create a new span and set some standard attributes. We then activate the newly created span in line 14: the entire function is run in the current span. On line 15 we invoke the original exec() function, passing all parameters the caller has passed us. If it succeeds, we mark the span as successful. Otherwise, if it fails, it throws an exception. We catch it, mark the span as failed, and rethrow the exception. In both cases, we end the span before the control leaves the function.

We can now write a simple test to check that everything works:

import { context } from '@opentelemetry/api';
import { NoopLogger } from '@opentelemetry/core';
import { NodeTracerProvider } from '@opentelemetry/node';
import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks';
import { InMemorySpanExporter, SimpleSpanProcessor } from '@opentelemetry/tracing';
import bs3 from 'better-sqlite3';
import { BetterSqlite3Plugin, plugin } from '../lib';

describe('BetterSqlite3Plugin', () => {
    let contextManager: AsyncHooksContextManager;
    let connection: bs3.Database;
    const provider = new NodeTracerProvider({ plugins: {} });
    const logger = new NoopLogger();
    const memoryExporter = new InMemorySpanExporter();

    beforeAll(() => {
        provider.addSpanProcessor(new SimpleSpanProcessor(memoryExporter));
    });

    afterAll(() => connection.close());

    beforeEach(() => {
        contextManager = new AsyncHooksContextManager().enable();
        context.setGlobalContextManager(contextManager);
        // eslint-disable-next-line @typescript-eslint/no-explicit-any
        plugin.enable(bs3 as any, provider, logger);

        connection = new bs3(':memory:');
    });

    afterEach(() => {
        context.disable();
        memoryExporter.reset();
        plugin.disable();
    });

    describe('Database', () => {
        it('should patch Database#exec ', () => {
            const span = provider.getTracer('default').startSpan('test span');
            provider.getTracer('default').withSpan(span, () => {
                const sql = 'SELECT 2+2';
                connection.exec(sql);

                const spans = memoryExporter.getFinishedSpans();
                expect(spans).toHaveLength(1);
            });
        });
    });
});

The test should succeed, and if we print spans[0], we will see something similar to this:

    Span {
      attributes: {
        'db.system': 'sqlite3',
        'db.statement': 'SELECT 2+2',
        'db.name': ':memory:'
      },
      links: [],
      events: [],
      status: { code: 0 },
      endTime: [ 1602996004, 491110651 ],
      _ended: true,
      _duration: [ 0, 47992 ],
      name: 'SELECT',
      spanContext: {
        traceId: '9dc8328f1b38ec4b9d25903786db2a01',
        spanId: '3a517043eb1dbaaf',
        traceFlags: 1,
        traceState: undefined
      },
      parentSpanId: '796be098a37dfe3f',
      kind: 2,
      startTime: [ 1602996004, 491062659 ],
      resource: Resource {
        attributes: {
          'telemetry.sdk.language': 'nodejs',
          'telemetry.sdk.name': 'opentelemetry',
          'telemetry.sdk.version': '0.11.0'
        }
      },
      instrumentationLibrary: { name: 'opentelemetry-plugin-better-sqlite3', version: '*' },
      _logger: ConsoleLogger {
        info: [Function],
        warn: [Function],
        error: [Function]
      },
      _traceParams: {
        numberOfAttributesPerSpan: 32,
        numberOfLinksPerSpan: 32,
        numberOfEventsPerSpan: 128
      },
      _spanProcessor: MultiSpanProcessor { _spanProcessors: [ [SimpleSpanProcessor] ] }
    }

So far so good. Now let us try to patch the Statement class. But wait… better-sqlite3 does not export it. Life is never easy. We will have to patch Database#prepare() method: this is the only method that creates a Statement.

The code will look similar:

// patch():
shimmer.wrap(
    proto,
    'prepare',
    this.patchPrepare as (original: typeof proto['prepare']) => typeof proto['prepare'],
);

// unpatch():
shimmer.massUnwrap([proto], ['exec', 'prepare']);

@types/better-sqlite3 declares the prepare method as prepare<BindParameters extends any[] | {} = any[]>(source: string): BindParameters extends any[] ? Statement<BindParameters> : Statement<[BindParameters]>, and it is quite hard to deduct the type of original which would satisfy the compiler. Therefore we just cast the type to what the compiler expects.

Before we implement the patch function, let us refactor the code and move span creation logic to a separate method. This will avoid duplication of the code, because we will need to create a span in other places.

    private createSpan(query: string, db: bs3Types.Database, operation?: string): Span {
        const statement = query.trim().split(/\s/u)[0];
        const spanName = operation ? ${operation}: ${statement} : statement;
        return this._tracer.startSpan(spanName, {
            kind: SpanKind.CLIENT,
            attributes: {
                [DatabaseAttribute.DB_SYSTEM]: 'sqlite3',
                [DatabaseAttribute.DB_STATEMENT]: query,
                [DatabaseAttribute.DB_NAME]: db.name,
            },
        });
    }

The patchPrepare property definition:

    private readonly patchPrepare = (original: (source: string) => bs3Types.Statement): typeof original => {
        const self = this;
        return function (this: bs3Types.Database, ...params): ReturnType<typeof original> {
            const span = self.createSpan(params[0], this, 'prepare');
            return self._tracer.withSpan(span, () => {
                try {
                    const result = original.apply(this, params);
                    span.setStatus({ code: CanonicalCode.OK }).end();

                    shimmer.massWrap([result], ['run', 'get', 'all'], self.patchStatement);

                    return result;
                } catch (e) {
                    span.setStatus({ code: CanonicalCode.UNKNOWN, message: (e as Error).message }).end();
                    throw e;
                }
            });
        };
    };

patchPrepare looks very similar to patchExec; the only change is that we monkey-patch the returned Statement before passing it back to the caller. Note that we patch the Statement instance, not the prototype, and therefore we are unable to “unpatch” it if the plugin gets disabled.

Let us look at patchStatement:

    private readonly patchStatement = (original: (...params: unknown[]) => unknown): typeof original => {
        const self = this;
        return function (this: bs3Types.Statement, ...params): ReturnType<typeof original> {
            if (!self.enabled) {
                shimmer.unwrap(this, original.name as keyof bs3Types.Statement);
                return original.apply(this, params);
            }

            const span = self.createSpan(this.source, this.database, original.name);
            return self._tracer.withSpan(span, () => {
                try {
                    const result = original.apply(this, params);
                    span.setStatus({ code: CanonicalCode.OK }).end();
                    return result;
                } catch (e) {
                    span.setStatus({ code: CanonicalCode.UNKNOWN, message: (e as Error).message }).end();
                    throw e;
                }
            });
        };
    };

Because the plugin may get disabled by the user at any time, we check the enabled flag. If it is set to false, this means that the plugin is disabled, and we then don’t create a new span, but unpatch the method and call the original one instead.

Test the Plugin

Now that we have written our plugin, it is time to see it in action. We are going to create a very simple application using TheCatAPI, which will retrieve the list of cat breeds and store it to the database, and then instrument it. We will visualize the trace data in OpenZipkin.

The test application itself is very simple:

import Database from 'better-sqlite3';
import fetch from 'node-fetch';

const db = new Database(':memory:');
db.exec(
    'CREATE TABLE cat_breeds (id CHAR(4) NOT NULL PRIMARY KEY, name VARCHAR(255), wikipedia_url VARCHAR(255) NULL)',
);

interface BreedModel {
    id: string;
    name: string;
    wikipedia_url: string;
}

const insert = db.prepare('INSERT INTO cat_breeds (id, name, wikipedia_url) VALUES (@id, @name, @wikipedia_url)');
const trx = db.transaction((items: BreedModel[]) => {
    items.forEach(({ id, name, wikipedia_url = null }) => insert.run({ id, name, wikipedia_url }));
});

fetch('https://api.thecatapi.com/v1/breeds')
    .then((r) => r.json())
    .then((r: BreedModel[]) => trx(r))
    .catch((e) => console.error(e));

It initializes an in-memory database, creates a table, and prepares a transaction which will insert the data into the table. Then it issues a request to the Cat API, retrieves the list of cat breeds in the JSON format, parses it, and uses the prepared transaction to insert the list into the database.

Let us add the instrumentation code. It should be placed before the main code.

import { NodeTracerProvider } from '@opentelemetry/node';
import { SimpleSpanProcessor } from '@opentelemetry/tracing';
import { ZipkinExporter } from '@opentelemetry/exporter-zipkin';

const provider = new NodeTracerProvider({
    plugins: {
        https: {},
        'better-sqlite3': {
            path: __dirname,
        },
    },
});

const zipkinExporter = new ZipkinExporter({
    url: 'http://127.0.0.1:9411/api/v2/spans',
    serviceName: 'example',
});

const zipkinProcessor = new SimpleSpanProcessor(zipkinExporter);
provider.addSpanProcessor(zipkinProcessor);

provider.register();

After that we will need to create the root span, and run our code inside that span:

const span = provider.getTracer('default').startSpan('Root Span');
provider.getTracer('default').withSpan(span, () => {
    const db = new Database(':memory:');

    // ... the rest of the code

    fetch('https://api.thecatapi.com/v1/breeds')
        .then((r) => r.json())
        .then((r: BreedModel[]) => trx(r))
        .catch((e) => console.error(e))
        .finally(() => span.end()); // <------ we need to end the span
});

This step is usually not needed in the real web application: the root span is created when the application starts to process the incoming request, and is automatically ended when the application is through with the request. But for the command line application we need this step.

Before we run the code, we need to start Zipkin. If you have docker, this is as simple as running

docker run --rm -d -p 9411:9411 --name zipkin openzipkin/zipkin

If not, you will have to follow Zipkin’s Quickstart Guide.

After running the application (ts-node example.ts), we need to go to Zipkin (127.0.0.1:9443), and then we will see something like this:

We see that the code has created a root span, the root span has 82 child spans, and the span duration was 1.287 seconds.

By clicking the span, we can dig deeper and see more details:

We see the root span and some of the child spans: the one for CREATE TABLE statement, the one for preparation of the INSERT statements, then a bunch of spans for preparation of various transaction-related statements (these are the implementation details of better-sqlite3: on the very first transaction it prepares nine statements; this happens once). After the last prepare span, we see a long get /v1/breeds span (it corresponds to fetch('https://api.thecatapi.com/v1/breeds') statement). If we queried our own instrumented service, we would see its spans as well: OpenTelemetry supports distributed tracing, and this is a great thing when you have to deal with microservices. After the get span, there are many run spans: the first of them (begin) starts the transaction, many insert spans correspond to the forEach loop in the transaction, and the last span (not seen in the screenshot) will be the commit span, where the transaction is committed.

Things Left as an Exercise to the Reader

There is obviously a lot of room for improvement: for the sake of simplicity, we did not record the bound parameters passed to the query. Then, we have not implemented a patch for Statement#iterate() method. There are other places in the Database which can be instrumented, such as backup() and loadExtension() (not that they are used very often, but this still would be a good exercise). Finally, there is a difficult task: create a dedicated span for every transaction. For this you will have to patch Database#transaction() method, but the difficulty is that it returns an object with transaction methods as readonly non-configurable properties.


OpenTelemetry 0.13 Update

OpenTelemetry 0.13 dropped CanonicalCode from @opentelemetry/api in favor of StatusCode, and now there is a dedeicated StatusCode.ERROR code to replace CanonicalCode.UNKNOWN for error conditions.

How to Create an OpenTelemetry Instrumentation Plugin
Tagged on:             

Leave a Reply

Your email address will not be published. Required fields are marked *