William Liu

Trino

What is Trino?

Trino is a SQL query engine enabling SQL access to any data source. You can use Trino to query large data sets.

Local

Run the below to download a container image of trino (and name it trino-trial)

docker run -d -p 8080:8080 --name trino-trial trinodb/trino

Exec into the image and run a SQL query

docker exec -it trino-trial trino

trino> SELECT COUNT(*) FROM tpch.sf1.nation;
 _col0
-------
    25
(1 row)

Stop the container

docker stop trino-trial

Start the container

docker start trino-trial

You can run trino with a --debug option to enable debug information when running queries

Trino CLI

Run Trino in debug mode:

trino --debug

Run SQL through Trino CLI directly with --execute:

trino --execute 'SELECT nationkey,...'

Run trino and specify the --catalog and --schema through the CLI:

trino --catalog tpch --schema sf1 --execute 'SELECT nationkey,...'

Run trino and point to a SQL file:

trino -f nations.sql

You can output the data in various formats with --output-format, including JSON, CSV, etc.

You can ignore errors with the option --ignore-error.

Trino Configs

There’s sets of configs, usually in an etc directory

Data Sources (Catalogs)

In Trino, catalogs define the data sources available to users. You add to the connector.name property, exposing the schemas and tables inside the data source to Trino

SQL Commands

help to see what commands are available

trino> help

Supported commands:
QUIT
EXIT
CLEAR
EXPLAIN [ ( option [, ...] ) ] <query>
    options: FORMAT { TEXT | GRAPHVIZ | JSON }
             TYPE { LOGICAL | DISTRIBUTED | VALIDATE | IO }
DESCRIBE <table>
SHOW COLUMNS FROM <table>
SHOW FUNCTIONS
SHOW CATALOGS [LIKE <pattern>]
SHOW SCHEMAS [FROM <catalog>] [LIKE <pattern>]
SHOW TABLES [FROM <schema>] [LIKE <pattern>]
USE [<catalog>.]<schema>

Show the catalogs we have (from the above catalogs config). Each catalog is associated with a specific connector.

trino> show catalogs;
 Catalog
---------
 jmx
 memory
 system
 tpcds
 tpch
(5 rows)

See what schemas (a way to organize tables) are in a catalog

trino> show schemas from tpch;
       Schema
--------------------
 information_schema
 sf1
 sf100
 sf1000
 sf10000
 sf100000
 sf300
 sf3000
 sf30000
 tiny
(10 rows)

Show what tables (a set of unordered rows, organized into named columns with data types) are in a schema

trino> SHOW TABLES FROM tpch.sf1;
  Table
----------
 customer
 lineitem
 nation
 orders
 part
 partsupp
 region
 supplier
(8 rows)

You can select a catalog and schema to work with, then omit the qualifier from the query

trino> USE tpch.sf1;

trino:sf1> SELECT COUNT(*) FROM nation;
 _col0
-------
    25

Describe the data about a table (region)

trino:sf1> DESCRIBE tpch.sf1.region;
  Column   |     Type     | Extra | Comment
-----------+--------------+-------+---------
 regionkey | bigint       |       |
 name      | varchar(25)  |       |
 comment   | varchar(152) |       |
(3 rows)

Show what functions are available

trino:sf1> SHOW FUNCTIONS\G;
-[ RECORD 1 ]--+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
Function       | abs
Return Type    | bigint
Argument Types | bigint
Function Type  | scalar
Deterministic  | true
Description    | Absolute value
-[ RECORD 2 ]--+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
Function       | abs
Return Type    | decimal(p,s)
Argument Types | decimal(p,s)
Function Type  | scalar
Deterministic  | true
Description    | Absolute value

String Concatenation

SELECT nation.name || ' / ' || region.name AS Location
FROM tpch.sf1.region
JOIN tpch.sf1.nation
ON region.regionkey = nation.regionkey
AND region.name LIKE 'AFRICA'
ORDER BY Location;

Connectors

Trino Architecture

At a high level, a Trino cluster is composed of one coordinator and multiple worker nodes. All communication and data transfer between clients, coordinator, and workers use REST-based interactions over HTTP/HTTPS.

A coordinator is a Trino server that handles incoming queries and manages the workers to execute the queries.

Under the hood, Trino has a service provider interface (SPI) that defines the functionality a connector has to implement for specific features. Each connector implements three parts of the API:

For example, listTables SPI means Trino can use the same method to ask any connector to check for a list of all tables.

Query Execution Model

In big steps: SQL statement -> Parser/Analyzer -> Planner/Optimizer -> Workers

The distributed query plan is an extension of the simple query plan consisting of one or more stages.

Having more than one stage results in the creation of a dependency tree of stages. The complexity of the query determines the number of stages.

SQL Optimization Rules

SQL rules with a goal to reduce query processing time, a query’s memory footprint, or the amount of data exchanged over the network.

Predicate Pushdown is the easiest to understand and the most important optimization. Its role is to remove the filtering conditions as close to the source of the data as possible. As a result, data reduction happens as early as possible during query executions.

For example, it might transform a Filter into a simpler Filter.

Cross Join Elimination is in the absence of the cost-based optimizer, Trino joins the tables contained in the SELECT query in the order of their appearance in the query text. The exception to this is when the tables to be joined have no joining condition, which results in a cross join. A cross join is unwanted in almost all practical cases.

TopN rolls ORDER BY followed by LIMIT into a TopN plan node. During query execution, TopN keeps the desired number of rows in a heap data structure, updating the heap while reading input data in a streaming fashion.

Partial Aggregations

TODO

Apache Hadoop and Hive

History

Hadoop consists of the Hadoop Distributed File System (HDFS) and application software, e.g. Hadoop MapReduce, to interact with the data stored in HDFS. Apache YARN is used to manage the resources needed by Hadoop applications. Data processing was performed with MapReduce programs (which enabled data processing to be distributed across a cluster) MapReduce is cumbersome for some use cases (e.g. answering analytical questions)

Hive is an alternative to using MapReduce. It was created as a SQL layer of abstraction on top of Hadoop, to interact with data in HDFS using a SQL-like syntax. Hive data is sstored as files (aka objects) in HDFS. These files can have various formats like ORC and Parquet. The files are stored in a particular directory and file layout that hive understands (i.e. Hive-style table format) We have Hive metadata that describes how data stored in HDFS maps to schemas, tables, and columns to be queried via the Hive query language, and indirectly with Trino and SQL. The metadata information is persisted in a database (e.g. MySQL, PostgreSQL) and is accessible via the Hive Metastore Service (HMS).

Hive Connector

The Hive connector for Trino allows you to connect to an HDFS object storage cluster. It leverages the metadata in HMS and queries and processes the data stored in HDFS. The most common use case of Trino is to leverage the Hive connector to read data from distributed storage such as HDFS or cloud storage.

Trino and the Trino Hive connector do not use the Hive runtime at all. Trino is a high-performance replacement for it and is suitable for running interactive queries. It works directly with files instead of using the Hive runtime and execution engine.

The hive connector is not constrained to HDFS, but works with distributed storage in general (e.g. HDFS, S3, Azure Blob, Azure Data Lake Storage, Google Cloud Storage, S3-compatible storage) as long as they implement the S3 API.

The main limitation is that it cannot push SQL processing to Hive. The schema information is accessed through HMS and the data layout is the same as with a Hive data warehouse.

Example usage:

Create schema / database

CREATE SCHEMA datalake.web
WITH (location = 's3://example-org/wb')

where we have data of:

s3://example-org/web/customers
s3://example-org/web/clicks
s3://example-org/web/sessions

Data in Trino

The Data Definition Language (DDL) for creating a Trino table. The below table page_views stores data under a directory also named page_views. The page_views directory is either a subdirectory under the directory defined in hive.metastore.warehouse.dir or is a different directory if you defined the schema location when creating the schema.

CREATE TABLE datalake.web.page_views (
  view_time timestamp,
  user_id bigint,
  page_url varchar,
  view_date date,
  country varchar
)

If you already have data (e.g. in HDFS or in S3), the DDL for a Trino table to point to existing data looks like:

CREATE TABLE datalake.web.page_views (
  view_time timestamp,
  user_id bigint,
  page_url varchar,
  view_date date,
  country varchar
)
WITH (
  external_location = 's3://starburst-external/page_views'
)

Partitioned Data

Data partitioning is a technique used to horizontally divide a logical table into smaller pieces of data known as partitions. You can partition off a single or multiple columns. Example:

CREATE TABLE datalake.web.page_views (
  view_time timestamp,
  user_id bigint,
  page_url varchar,
  view_date date,
  country varchar
)
WITH (
  partitioned_by = ARRAY['view_date', 'country']
)

Partitioning gives improved query performance. When a partition uses the columns in WHERE, it reads the only the partition that you need.

Inserting Data

Trino supports:

INSERT INTO ... VALUES
INSERT INTO ... SELECT
CREATE TABLE AS SELECT

The INSERT INTO has limited use since it creates a single file and single row for each statement, which makes it limited use (for learning).

Example INSERT INTO

INSERT INTO page_views_ext SELECT * FROM page_views;

You’ll often use INSERT SELECT and CREATE TABLE, which perform the same function.

To autodiscover and sync partitions, you can call:

CALL system.sync_partition_metadata(
  'web',
  'page_views',
  'FULL'
)

You can also create partitions manually (e.g. empty partitions)

CALL system.create_empty_partition(
  'web',
  'page_views',
  ARRAY['view_date'],
  ARRAY['2019-01-14']
)

File formats

You can setup the format using WITH:

CREATE TABLE datalake.web.page_views (
  view_time timestamp,
  user_id bigint,
  page_url varchar,
  ds_date,
  country varchar
)
WITH (
  format = 'PARQUET'
)

System Queries

What queries are currently running:

SELECT
    *
FROM
    system.runtime.queries
WHERE
    state = 'RUNNING' LIMIT 10;