Connecting Apache Airflow to superQuery

Share on facebook
Share on google
Share on linkedin
Share on twitter
These instructions explain how to connect your Tableau account to superQuery’s query optimization engine.
 
When connected, your queries will pass through superQuery — where it will be automatically optimized — before being executed in BigQuery.
 
The result: a significant reduction in query costs and execution time.

General 

  1. Click on the Integrations link in the sidebar of superQuery:

  2. Enable your desired connection by clicking “Connect”
  3. After clicking “Connect” you’ll see the username and password screen, like so:

  1.  Whitelist your IP Address if necessary
  2.  Copy the hostname and port from this window into your service of choice.
    • server: proxy.superquery.io
    • port: 3306

6.  Finalize the connection.

Prerequisites

    • A working Apache Airflow system
      • Locally OR
      • On your own server or Kubernetes cluster OR
      • Google Cloud Composer
    • Copy the SuperQueryOperator into the plugins folder on the relevant system

Knowing your costs in Airflow

While your Airflow DAGs are churning away happily and pushing data to your processing systems of choice, a heap of logging happens in the background. Airflows logs are easily accessible, simple to read and give you a good overview of what your DAG is doing. Wouldn’t it be great if the log files could also show you information on your query execution plan and specifically what the cost and total data scanned was? For sure it would!  

Click on View Log after clicking on the Task you are monitoring in Airflow:

The superQueryOperator is a wrapper for the MySqlOperator and hook, and will connect to the superQuery proxy when given the correct details.  

Make sure that you go to Admin → Connections in the Airflow UI and edit the mysql_default connection:

After connecting and running a DAG using the superQueryOperator, you should see this in your log files:

--------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------
[2019-03-11 21:12:02,129] {models.py:1593} INFO - Executing 
<Task(SuperQueryOperator): connect_to_superquery_proxy> on 
2019-03-01T00:00:00+00:00
[2019-03-11 21:12:03,836] {superq_operators.py:54} INFO - Executing:
#standardSQL
SELECT COUNT(testField) FROM `mydata.PROD.myTable`;
[2019-03-11 21:12:03,844] {logging_mixin.py:95} INFO - 
[2019-03-11 21:12:03,843] {base_hook.py:83} INFO - Using connection to: id: 
mysql_default. Host: superproxy.system.io, Port: 3306, Schema: None, Login: 
XXXXXX, Password: XXXXXXXX, extra: {}
[2019-03-11 21:12:15,172] {superq_operators.py:68} INFO - ((
'{
  "startTime":1552331525642,
  "endTime":1552331534624,
  "executionTime":"8988",
  "bigQueryTotalBytesProcessed":26388279066,
  "bigQueryTotalCost":"0.12",
  "superQueryTotalBytesProcessed":0,
  "superQueryTotalCost":"0.00",
  "saving":0,
  "totalRows":"1",
}', '', '1', 'true'),)
[2019-03-11 21:12:17,121] {logging_mixin.py:95} INFO - 
[2019-03-11 21:12:17,119] {jobs.py:2527} INFO - 
Task exited with return code 0

This chunk of log tells you that your BigQuery operator in Airflow scanned across 24Gb of data and it cost you $0.12. Simple.

You can also go ahead and parse the log files in a 3rd party tool or a bash script and create a summary of the costs originating from DAGs that scan data from BigQuery.

How does this all work?

SuperQuery uses a MySql proxy in order to make connectivity universal and offer a SQL interface to get information.

What you need next: The SuperQueryOperator

There are the steps involved in order to get the same functionality described above:

  1. Add the superquery plugin to Airflow in order to use the SuperQueryOperator.  This will be in the AIRFLOW_HOME/plugins folder, or if you’re using Google Cloud Composer for airflow, place the superQueryOperator into the Cloud storage folder under /plugins
  2. Subscribe to superQuery for a trial and get login details for the superQuery MySql proxy
  3. Test your connection to the proxy with the DAG provided below.
  4. Replace your BigQuery operators with the SuperQuery operator in your own DAGs when you want to use this functionality.

Using the SuperQueryOperator

This is the interface for the SuperQuery operator:

TEST_SQL = """#standardSQL
SELECT COUNT(*) FROM `mydata.PROD.myTable`;"""
SuperQueryOperator(
 task_id="connect_to_superquery_proxy",
 sql=TEST_SQL,
 database="",
explain=True,  # False if you don't want information
 dag=dag)


This code goes into the /plugins folder in Airflow:

import uuid
import logging
import pandas as pd
import json
from contextlib import closing
import sys
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.aws_dynamodb_hook import AwsDynamoDBHook
from airflow.contrib.hooks.bigquery_hook import BigQueryHook, BigQueryPandasConnector
from airflow.hooks.mysql_hook import MySqlHook
LOG = logging.getLogger(__name__)
class SuperQueryOperator(BaseOperator):
    """
    Executes sql code in Google Bigquery via the SuperQuery proxy MySql interface
    :param sql: the sql code to be executed. Can receive a str representing a
        sql statement, a list of str (sql statements), or reference to a template file.
        Template reference are recognized by str ending in '.sql'
        (templated)
    :type sql: str or list[str]
    :param mysql_conn_id: reference to a specific mysql database
    :type mysql_conn_id: str
    :param parameters: (optional) the parameters to render the SQL query with.
    :type parameters: mapping or iterable
    :param autocommit: if True, each command is automatically committed.
        (default value: False)
    :type autocommit: bool
    :param database: name of database which overwrite defined one in connection
    :type database: str
    :param explain: if True, the execution plan (and cost) of the sql is returned to the Airflow logs.
        (default value: False)
    :type explain: bool
    """
    template_fields = ('sql',)
    template_ext = ('.sql',)
    ui_color = '#ededed'
    @apply_defaults
    def __init__(
            self, sql, mysql_conn_id='mysql_default', parameters=None,
            autocommit=False, database=None, explain=False, *args, **kwargs):
        super(SuperQueryOperator, self).__init__(*args, **kwargs)
        self.mysql_conn_id = mysql_conn_id
        self.sql = sql
        self.autocommit = autocommit
        self.parameters = parameters
        self.database = database
        self.explain = explain
    def execute(self, context):
        self.log.info('Executing: %s', self.sql)
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
                         schema=self.database)
        
        if (self.explain):
            test_answer = self._get_records(
                hook,
                [self.sql, """explain;"""],
                parameters=self.parameters)
            self.log.info(test_answer)
        else:
            test_answer = self._get_records(
                hook,
                self.sql,
                parameters=self.parameters)
            self.log.info(test_answer)
    def _get_records(self, hook, sql, parameters=None):
        """
        Executes the sql and returns a set of records.
        :param sql: the sql statement to be executed (str)
        :type sql: str
        :param parameters: The parameters to render the SQL query with.
        :type parameters: mapping or iterable
        """
        if sys.version_info[0] < 3:
            sql = sql.encode('utf-8')
with closing(hook.get_conn()) as conn:
            with closing(conn.cursor()) as cur:
                # In this scenario we only execute the SQL 
                # and don't return the result
                if (isinstance(sql, str)):
                    if parameters is not None:
                        cur.execute(sql, parameters)
                    else:
                        cur.execute(sql)
                # In this scenario, we have the SQL and the Explain;
                # This means we return the Explain payload
                if (isinstance(sql, list)):
                    if (len(sql) > 2): 
                        raise AirflowException("Can only execute a single SQL statement, not a list of statements.")
                    for s in sql:
                        if parameters is not None:
                            cur.execute(s, parameters)
                        else:
                            cur.execute(s)
                    return cur.fetchall()


Code to test the connection to SuperQuery with:

from datetime import timedelta, datetime
import json
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators import DynamodbToBigqueryOperator
from airflow.operators import SuperQueryOperator
from airflow.hooks.mysql_hook import MySqlHook
TEST_SQL = """#standardSQL
SELECT COUNT(*) FROM `yourproject.yourdataset.yourtable`;
"""
default_args = {
    "owner": "yooryourcompany",
    "depends_on_past": True,
    "start_date": datetime(2019, 3, 1), #start after this date
    "email": ["you@yourcompany.io"],
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 3,
    "retry_delay": timedelta(minutes=1),
}
schedule_interval = "@once" # Run just once
dag = DAG("superquery_connection_test",
          default_args=default_args,
          schedule_interval=schedule_interval)
t_start = DummyOperator(task_id="start_here", dag=dag)
t_check_superquery_connection = SuperQueryOperator(
    task_id="connect_to_superquery_proxy",
    sql=TEST_SQL,
    database="",
    explain=True,
    dag=dag
)
# The DAG flow
t_start >> t_check_superquery_connection

Boom! You’re all set.

Query on! 🙂

Close Menu