Pular para conteúdo

Utils

check_valide_date(date)

Verifica se uma determinada data está no formato 'AAAA-MM-DD'.

Parameters:

Name Type Description Default
date str

A data a ser verificada.

required

Returns:

Name Type Description
bool bool

True se a date estiver no formato correto, False caso contrário.

Source code in source/utils.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def check_valide_date(date: str) -> bool:
    """
    Verifica se uma determinada data está no formato 'AAAA-MM-DD'.

    Args:
        date (str): A data a ser verificada.

    Returns:
        bool: True se a date estiver no formato correto, False caso contrário.
    """

    # Tentamos analisar a data usando o formato '%Y-%m-%d'. Se falhar,
    # retornamos False. Caso contrário, retornamos True.
    try:
        datetime.strptime(date, '%Y-%m-%d')
        return True
    except ValueError:
        return False

create_url_filter(date=None)

Cria a URL para a requisição.

Esta função recebe um parâmetro opcional 'data' que representa a data dos artigos de notícias. Se nenhuma data for fornecida, ela assume a data atual do sistema no formato 'AAAA-MM-DD'.

Parameters:

Name Type Description Default
date str ou None

A data dos artigos de notícias. O valor padrão é None.

None
Retorna

str: A URL completa para a requisição.

Source code in source/utils.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def create_url_filter(date: Union[str, None] = None) -> List[str]:
    """
    Cria a URL para a requisição.

    Esta função recebe um parâmetro opcional 'data' que representa a data
    dos artigos de notícias. Se nenhuma data for fornecida, ela assume a data
    atual do sistema no formato 'AAAA-MM-DD'.

    Args:
        date (str ou None): A data dos artigos de notícias. O valor padrão é None.

    Retorna:
        str: A URL completa para a requisição.
    """

    # Define a data padrão se nenhuma data for fornecida ou estiver no formato incorreto.
    # if check_valide_date(date) == False or date is None:
    #     date = datetime.today().strftime("%Y-%m-%d")
    urls = []

    # Obtém os parâmetros de configuração
    params = config_url()
    q1 = params["query_1"]
    q2 = params["query_2"]
    q3 = params["query_3"]
    queries = [q1, q2, q3]
    password = params["key_password"]
    from_date, to_date = calculate_date()

    # Construção da URL
    for query in queries:    

        url = (
            f'https://newsapi.org/v2/everything?q={query}'
            f'&language=en&apiKey={password}' #from={from_date}&to={to_date}&language=en&sortBy=publishedAt
        )
        urls.append(url)

    return urls

escape_string(value)

Escapa uma string substituindo aspas simples por duas aspas simples.

Source code in source/utils.py
350
351
352
def escape_string(value):
    """Escapa uma string substituindo aspas simples por duas aspas simples."""
    return value.replace("'", "''")

insert_request_df(df)

Insere um dataframe em um banco de dados PostgreSQL.

Parameters:

Name Type Description Default
df DataFrame

O dataframe a ser inserido.

required

Esta função se conecta a um banco de dados PostgreSQL, cria um cursor, converte o dataframe para uma string CSV, escreve a string CSV em um objeto StringIO e usa o cursor para inserir os dados CSV na tabela 'noticias'.

Lança

psycopg2.Error: Se houver erro ao inserir o registro.

Retorna

None

Source code in source/utils.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def insert_request_df(df: pd.DataFrame) -> None:
    """
    Insere um dataframe em um banco de dados PostgreSQL.

    Args:
        df (pd.DataFrame): O dataframe a ser inserido.

    Esta função se conecta a um banco de dados PostgreSQL, cria um cursor, 
    converte o dataframe para uma string CSV, escreve a string CSV em um 
    objeto StringIO e usa o cursor para inserir os dados CSV na tabela 
    'noticias'.

    Lança:
        psycopg2.Error: Se houver erro ao inserir o registro.

    Retorna:
        None
    """

    # Obtém a configuração do banco de dados
    params = config_db()

    # Conecta-se ao banco de dados
    print('Conectando ao banco de dados PostgreSQL ...')
    connection = psycopg2.connect(**params)

    # Cria um cursor
    cursor = connection.cursor()

    try:
        # Converte dataframe para string CSV
        sio = StringIO()
        writer = csv.writer(sio)
        writer.writerows(df.values)
        sio.seek(0)

        # Insere dados CSV no banco de dados
        cursor.copy_expert(
            sql="""COPY raw_db.news (
                    author, 
                    title, 
                    description, 
                    url, 
                    image_url, 
                    publication_date,
                    content,
                    tags,
                    source,
                    query0,
                    query1,
                    query2
                ) FROM STDIN WITH CSV""",
            file=sio
        )

        # Confirma a transação
        connection.commit()

        print("Registro inserido com sucesso!")

    except psycopg2.Error as e:
        print("Erro ao inserir o registro:", e)

    finally:
        # Fecha o cursor e a conexão
        if cursor:
            cursor.close()
        if connection:
            connection.close()

update_db_silver()

Uma vez por dia sumariza as informaçoes do raw_db em um db processado

Source code in source/utils.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def update_db_silver() -> None:
    """
    Uma vez por dia sumariza as informaçoes 
    do raw_db em um db processado
    """

    params = config_db()

    try:
        with open('consulta.sql', 'r') as file:
            sql_query = file.read()

        engine = create_engine(f"postgresql://{params['user']}:{params['password']}@{params['host']}:{params['port']}/{params['dbname']}")
        df = pd.read_sql(sql_query, engine)

        df = df[['title','author','description','url','publication_date', 'source','query0','query1','query2']]
        df.dropna(subset=['title', 'url', 'author', 'publication_date', 'source'], inplace =True)

        insert_source(df)
        insert_author(df)

        df = execute_join_source(df)
        df = execute_join_author(df)

        with engine.connect() as conn:
            for _, row in df.iterrows():

                escaped_title = escape_string(row['title'])
                escaped_description = escape_string(row['description'])
                escaped_url = escape_string(row['url'])
                #escaped_publication_date = escape_string(row['publication_date'])

                query = text(f"""
                    INSERT INTO silver_db.news (title, description, url, publication_date, query0, query1, query2, source_id, author_id)
                    VALUES ('{escaped_title}', '{escaped_description}', '{escaped_url}', '{row['publication_date']}', '{row['query0']}', '{row['query1']}', '{row['query2']}', '{row['source_id']}', '{row['author_id']}')
                    ON CONFLICT (title, url) DO NOTHING;
                    """)
                conn.execute(query)  # Passando os valores do DataFrame para a consulta SQL

            # Confirmar a transação após inserções
            conn.commit()

        print("Dados inseridos com sucesso!")

    except Exception as e:
        print("Erro ao inserir dados:", e)

update_raw_db()

Cada uma hora faz request para a API e insere os dados em um DataFrame.

Esta função faz uma requisição HTTP para a API fornecida pelo método create_url_filter() e insere os dados na base de dados definida no arquivo config.py.

Source code in source/utils.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def update_raw_db() -> None:
    """
    Cada uma hora faz request para a API e insere os dados em um DataFrame.

    Esta função faz uma requisição HTTP para a API fornecida pelo método `create_url_filter()`
    e insere os dados na base de dados definida no arquivo `config.py`.
    """

    # Cria uma lista de URL para a requisição da API
    urls = create_url_filter()
    df = create_dataframe(urls)

    if df is not None:
        insert_request_df(df)