GOOGLE ADS

Mittwoch, 27. April 2022

So stellen Sie eine Verbindung zu Postgres her, indem Sie eine Postgres-Verbindungs-ID in einem aufrufbaren Python verwenden

Ich verwende den Python-Operator von Airflow, um eine Python-Funktion aufzurufen. Der ERROR tritt im Try/Except-Block auf.

def python_callable_new():
print("Inside python callable...")
import psycopg2
try:
print("attempting database connection from python method.. ")
conn = psycopg2.connect('postgres_defined_connection')
print("success. ")
except Exception as error:
print("failed: ")
print (error)
return 'End of callable. '

with dag:
start_task = DummyOperator( task_id= "start" )
stop_task = DummyOperator( task_id= "stop" )

do_python_task = PythonOperator(
task_id = 'do-py-operation',
python_callable= python_callable_new,
)
extract_source_data = PostgresOperator(
task_id='extract-cb-source-data',
postgres_conn_id='postgres_defined_connection',
sql='./sql_scripts/extract_csv_data.sql'
)
# csv_to_postgres
start_task >> do_python_task >> extract_source_data >> stop_task

Grundsätzlich ist meine Frage


  • Wie kann ich meine 'postgres_defined_connection' verwenden, um eine Verbindung zu Postgres innerhalb meiner Python-Funktion herzustellen?

  • Es stellt eine gute Verbindung her, wenn ich den PostgresOperator verwende, wie in der Aufgabe extract_source_data zu sehen ist, aber ich muss ihn in der aufrufbaren Funktion verwenden.

  • Der Fehler, der kommt, ist ungültiges dsn: fehlendes "=" nach "postgres_defined_connection" in Verbindungsinfo-String


(FYI - Ich speichere die postgres_defined_connection in einer separaten Datei "connections.py", die die sqlalchemy-Engine und PostgresHook verwendet.)


Lösung des Problems

psycopg2.connecterwartet Verbindungsparameter. Sie können ihnen eine einzelne Zeichenfolge übergeben, wenn Sie Ihre Verbindungsparameter als durch Leerzeichen getrennte Schlüssel/Wert-Paare formatieren. Aus diesem Grund erhalten Sie die Fehlermeldung "=" fehlt.

Weitere Informationen finden Sie in der psycopg-Dokumentation.

Um eine Verbindung zu einer Postgres-Datenbank in Airflow herzustellen, können Sie den PostgresHook nutzen, vorausgesetzt, Sie haben eine Verbindung erstellt.

from airflow.hooks.postgres_hook import PostgresHook

def execute_query_with_conn_obj(query):
hook = PostgresHook(postgres_conn_id='my_connection')
conn = hook.get_conn()
cur = conn.cursor()
cur.execute(query)
def execute_query_with_hook(query):
hook = PostgresHook(postgres_conn_id='my_connection')
hook.run(sql=query)

Sie können dies auch mit reinem Python-Code tun.

def execute_query_with_psycopg(query):
conn_args = dict(
host='myhost';,
user='admin',
password='password',
dbname='my_schema',
port=5432)
conn = psycopg2.connect(**conn_args)
cur = conn.cursor()
cur.execute(query)
def execute_query_with_psycopg_string(query):
conn = psycopg2.connect("dbname=test user=postgres password=secret")
cur = conn.cursor()
cur.execute(query)

Keine Kommentare:

Kommentar veröffentlichen

Warum werden SCHED_FIFO-Threads derselben physischen CPU zugewiesen, obwohl CPUs im Leerlauf verfügbar sind?

Lösung des Problems Wenn ich das richtig verstehe, versuchen Sie, SCHED_FIFO mit aktiviertem Hyperthreading ("HT") zu verwenden, ...