Trino is a SQL query engine enabling SQL access to any data source. You can use Trino to query large data sets.
Run the below to download a container image of trino (and name it trino-trial
)
docker run -d -p 8080:8080 --name trino-trial trinodb/trino
Exec into the image and run a SQL query
docker exec -it trino-trial trino
trino> SELECT COUNT(*) FROM tpch.sf1.nation;
_col0
-------
25
(1 row)
Stop the container
docker stop trino-trial
Start the container
docker start trino-trial
You can run trino with a --debug
option to enable debug information when running queries
Run Trino in debug mode:
trino --debug
Run SQL through Trino CLI directly with --execute
:
trino --execute 'SELECT nationkey,...'
Run trino and specify the --catalog
and --schema
through the CLI:
trino --catalog tpch --schema sf1 --execute 'SELECT nationkey,...'
Run trino and point to a SQL file:
trino -f nations.sql
You can output the data in various formats with --output-format
, including JSON
, CSV
, etc.
You can ignore errors with the option --ignore-error
.
There’s sets of configs, usually in an etc
directory
etc/config.properties
)etc/node.properties
)etc/jvm.config
)/etc/catalog/salesproperties
, /etc/catalog/mysql-dev.properties
In Trino, catalogs define the data sources available to users.
You add to the connector.name
property, exposing the schemas and tables inside the data source to Trino
help
to see what commands are available
trino> help
Supported commands:
QUIT
EXIT
CLEAR
EXPLAIN [ ( option [, ...] ) ] <query>
options: FORMAT { TEXT | GRAPHVIZ | JSON }
TYPE { LOGICAL | DISTRIBUTED | VALIDATE | IO }
DESCRIBE <table>
SHOW COLUMNS FROM <table>
SHOW FUNCTIONS
SHOW CATALOGS [LIKE <pattern>]
SHOW SCHEMAS [FROM <catalog>] [LIKE <pattern>]
SHOW TABLES [FROM <schema>] [LIKE <pattern>]
USE [<catalog>.]<schema>
Show the catalogs we have (from the above catalogs config). Each catalog is associated with a specific connector.
trino> show catalogs;
Catalog
---------
jmx
memory
system
tpcds
tpch
(5 rows)
See what schemas (a way to organize tables) are in a catalog
trino> show schemas from tpch;
Schema
--------------------
information_schema
sf1
sf100
sf1000
sf10000
sf100000
sf300
sf3000
sf30000
tiny
(10 rows)
Show what tables (a set of unordered rows, organized into named columns with data types) are in a schema
trino> SHOW TABLES FROM tpch.sf1;
Table
----------
customer
lineitem
nation
orders
part
partsupp
region
supplier
(8 rows)
You can select a catalog and schema to work with, then omit the qualifier from the query
trino> USE tpch.sf1;
trino:sf1> SELECT COUNT(*) FROM nation;
_col0
-------
25
Describe the data about a table (region
)
trino:sf1> DESCRIBE tpch.sf1.region;
Column | Type | Extra | Comment
-----------+--------------+-------+---------
regionkey | bigint | |
name | varchar(25) | |
comment | varchar(152) | |
(3 rows)
Show what functions are available
trino:sf1> SHOW FUNCTIONS\G;
-[ RECORD 1 ]--+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
Function | abs
Return Type | bigint
Argument Types | bigint
Function Type | scalar
Deterministic | true
Description | Absolute value
-[ RECORD 2 ]--+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
Function | abs
Return Type | decimal(p,s)
Argument Types | decimal(p,s)
Function Type | scalar
Deterministic | true
Description | Absolute value
String Concatenation
SELECT nation.name || ' / ' || region.name AS Location
FROM tpch.sf1.region
JOIN tpch.sf1.nation
ON region.regionkey = nation.regionkey
AND region.name LIKE 'AFRICA'
ORDER BY Location;
At a high level, a Trino cluster is composed of one coordinator and multiple worker nodes. All communication and data transfer between clients, coordinator, and workers use REST-based interactions over HTTP/HTTPS.
A coordinator is a Trino server that handles incoming queries and manages the workers to execute the queries.
Under the hood, Trino has a service provider interface (SPI) that defines the functionality a connector has to implement for specific features. Each connector implements three parts of the API:
For example, listTables
SPI means Trino can use the same method to ask any connector to check for a list of all tables.
In big steps: SQL statement -> Parser/Analyzer -> Planner/Optimizer -> Workers
The distributed query plan is an extension of the simple query plan consisting of one or more stages.
Having more than one stage results in the creation of a dependency tree of stages. The complexity of the query determines the number of stages.
SQL rules with a goal to reduce query processing time, a query’s memory footprint, or the amount of data exchanged over the network.
Predicate Pushdown is the easiest to understand and the most important optimization. Its role is to remove the filtering conditions as close to the source of the data as possible. As a result, data reduction happens as early as possible during query executions.
For example, it might transform a Filter
into a simpler Filter
.
Cross Join Elimination is in the absence of the cost-based optimizer, Trino joins the tables contained
in the SELECT
query in the order of their appearance in the query text. The exception to this is when
the tables to be joined have no joining condition, which results in a cross join. A cross join is unwanted in almost
all practical cases.
TopN rolls ORDER BY
followed by LIMIT
into a TopN plan node. During query execution, TopN keeps
the desired number of rows in a heap data structure, updating the heap while reading input data in a streaming fashion.
Partial Aggregations
TODO
History
Hadoop consists of the Hadoop Distributed File System (HDFS) and application software, e.g. Hadoop MapReduce, to interact with the data stored in HDFS. Apache YARN is used to manage the resources needed by Hadoop applications. Data processing was performed with MapReduce programs (which enabled data processing to be distributed across a cluster) MapReduce is cumbersome for some use cases (e.g. answering analytical questions)
Hive is an alternative to using MapReduce. It was created as a SQL layer of abstraction on top of Hadoop, to interact with data in HDFS using a SQL-like syntax. Hive data is sstored as files (aka objects) in HDFS. These files can have various formats like ORC and Parquet. The files are stored in a particular directory and file layout that hive understands (i.e. Hive-style table format) We have Hive metadata that describes how data stored in HDFS maps to schemas, tables, and columns to be queried via the Hive query language, and indirectly with Trino and SQL. The metadata information is persisted in a database (e.g. MySQL, PostgreSQL) and is accessible via the Hive Metastore Service (HMS).
The Hive connector for Trino allows you to connect to an HDFS object storage cluster. It leverages the metadata in HMS and queries and processes the data stored in HDFS. The most common use case of Trino is to leverage the Hive connector to read data from distributed storage such as HDFS or cloud storage.
Trino and the Trino Hive connector do not use the Hive runtime at all. Trino is a high-performance replacement for it and is suitable for running interactive queries. It works directly with files instead of using the Hive runtime and execution engine.
The hive connector is not constrained to HDFS, but works with distributed storage in general (e.g. HDFS, S3, Azure Blob, Azure Data Lake Storage, Google Cloud Storage, S3-compatible storage) as long as they implement the S3 API.
The main limitation is that it cannot push SQL processing to Hive. The schema information is accessed through HMS and the data layout is the same as with a Hive data warehouse.
Example usage:
Create schema / database
CREATE SCHEMA datalake.web
WITH (location = 's3://example-org/wb')
where we have data of:
s3://example-org/web/customers
s3://example-org/web/clicks
s3://example-org/web/sessions
The Data Definition Language (DDL) for creating a Trino table.
The below table page_views
stores data under a directory also named page_views
.
The page_views
directory is either a subdirectory under the directory defined in
hive.metastore.warehouse.dir
or is a different directory if you defined the schema location
when creating the schema.
CREATE TABLE datalake.web.page_views (
view_time timestamp,
user_id bigint,
page_url varchar,
view_date date,
country varchar
)
If you already have data (e.g. in HDFS or in S3), the DDL for a Trino table to point to existing data looks like:
CREATE TABLE datalake.web.page_views (
view_time timestamp,
user_id bigint,
page_url varchar,
view_date date,
country varchar
)
WITH (
external_location = 's3://starburst-external/page_views'
)
Data partitioning is a technique used to horizontally divide a logical table into smaller pieces of data known as partitions. You can partition off a single or multiple columns. Example:
CREATE TABLE datalake.web.page_views (
view_time timestamp,
user_id bigint,
page_url varchar,
view_date date,
country varchar
)
WITH (
partitioned_by = ARRAY['view_date', 'country']
)
Partitioning gives improved query performance. When a partition uses the columns in WHERE
, it reads the only
the partition that you need.
Trino supports:
INSERT INTO ... VALUES
INSERT INTO ... SELECT
CREATE TABLE AS SELECT
The INSERT INTO
has limited use since it creates a single file and single row for each statement, which
makes it limited use (for learning).
Example INSERT INTO
INSERT INTO page_views_ext SELECT * FROM page_views;
You’ll often use INSERT SELECT
and CREATE TABLE
, which perform the same function.
To autodiscover and sync partitions, you can call:
CALL system.sync_partition_metadata(
'web',
'page_views',
'FULL'
)
You can also create partitions manually (e.g. empty partitions)
CALL system.create_empty_partition(
'web',
'page_views',
ARRAY['view_date'],
ARRAY['2019-01-14']
)
You can setup the format using WITH
:
CREATE TABLE datalake.web.page_views (
view_time timestamp,
user_id bigint,
page_url varchar,
ds_date,
country varchar
)
WITH (
format = 'PARQUET'
)
What queries are currently running:
SELECT
*
FROM
system.runtime.queries
WHERE
state = 'RUNNING' LIMIT 10;