Location>code7788 >text

LLM application practice - financial news automatic aggregation

Popularity:628 ℃/2024-12-17 03:00:28
import asyncio from crawl4ai import AsyncWebCrawler from crawl4ai.extraction_strategy import JsonCssExtractionStrategy import json from typing import Dict, Any, Union, List import os import datetime import re import hashlib def md5(text): m = hashlib.md5() (('utf-8')) return () def get_datas(file_path, json_flag=True, all_flag=False, mode='r'): """Reading text files""" results = [] with open(file_path, mode, encoding='utf-8') as f: for line in (): if json_flag: ((line)) else: (()) if all_flag: if json_flag: return (''.join(results)) else: return '\n'.join(results) return results def save_datas(file_path, datas, json_flag=True, all_flag=False, with_indent=False, mode='w'): """Save text file""" with open(file_path, mode, encoding='utf-8') as f: if all_flag: if json_flag: ((datas, ensure_ascii=False, indent= 4 if with_indent else None)) else: (''.join(datas)) else: for data in datas: if json_flag: ((data, ensure_ascii=False) + '\n') else: (data + '\n') class AbstractAICrawler(): def __init__(self) -> None: pass def crawl(): raise NotImplementedError() class AINewsCrawler(AbstractAICrawler): def __init__(self, domain) -> None: super().__init__() = domain self.file_path = f'data/{}.json' = () def init(self): if not (self.file_path): return {} return {ele['id']: ele for ele in get_datas(self.file_path)} def save(self, datas: Union[List, Dict]): if isinstance(datas, dict): datas = [datas] ({ele['id']: ele for ele in datas}) save_datas(self.file_path, datas=list(())) async def crawl(self, url:str, schema: Dict[str, Any]=None, always_by_pass_cache=True, bypass_cache=True, headless=True, verbose=False, magic=True, page_timeout=15000, delay_before_return_html=2.0, wait_for='', js_code=None, js_only=False, screenshot=False, headers={}): extraction_strategy = JsonCssExtractionStrategy(schema, verbose=verbose) if schema else None async with AsyncWebCrawler(verbose=verbose, headless=headless, always_by_pass_cache=always_by_pass_cache, headers=headers) as crawler: result = await ( url=url, extraction_strategy=extraction_strategy, bypass_cache=bypass_cache, page_timeout=page_timeout, delay_before_return_html=delay_before_return_html, wait_for=wait_for, js_code=js_code, magic=magic, remove_overlay_elements=True, process_iframes=True, exclude_external_links=True, js_only=js_only, screenshot=screenshot ) assert , "Failed to crawl the page" if schema: res = (result.extracted_content) if screenshot: return res, return res return class FinanceNewsCrawler(AINewsCrawler): def __init__(self, domain='') -> None: super().__init__(domain) def save(self, datas: Union[List, Dict]): if isinstance(datas, dict): datas = [datas] ({ele['id']: ele for ele in datas}) save_datas(self.file_path, datas=datas, mode='a') async def get_last_day_data(self): last_day = (() - (days=1)).strftime('%Y-%m-%d') datas = () return [v for v in () if last_day in v['date']] class CLSCrawler(FinanceNewsCrawler): """ Caixa News Grab """ def __init__(self) -> None: = 'cls' super().__init__() = '' async def crawl_url_list(self, url='/depth?id=1000'): schema = { 'name': 'caijingwang toutiao page crawler', 'baseSelector': '-left', 'fields': [ { 'name': 'top_titles', 'selector': '-top-article-list', 'type': 'nested_list', 'fields': [ {'name': 'href', 'type': 'attribute', 'attribute':'href', 'selector': 'a[href]'} ] }, { 'name': 'sec_titles', 'selector': '-top-article-list -l', 'type': 'nested_list', 'fields': [ {'name': 'href', 'type': 'attribute', 'attribute':'href', 'selector': 'a[href]'} ] }, { 'name': 'bottom_titles', 'selector': '-t-1 ', 'type': 'nested_list', 'fields': [ {'name': 'href', 'type': 'attribute', 'attribute':'href', 'selector': 'a[href]'} ] } ] } js_commands = [ """ (async () => {{ await new Promise(resolve => setTimeout(resolve, 500)); const targetItemCount = 100; let currentItemCount = ('-t-1 -w-b').length; let loadMoreButton = ('.-button'); while (currentItemCount < targetItemCount) {{ (0, ); await new Promise(resolve => setTimeout(resolve, 1000)); if (loadMoreButton) { (); } else { ('Load More button not found'); break; } await new Promise(resolve => setTimeout(resolve, 1000)); currentItemCount = ('-t-1 -w-b').length; loadMoreButton = ('.-button'); }} (`loaded ${currentItemCount} classifier for individual things or people, general, catch-all classifieritem`); return currentItemCount; }})(); """ ] wait_for = '' results = {} menu_dict = { '1000': 'lead story (on the news)', '1003': 'A-share', '1007': 'worldwide' } for k, v in menu_dict.items(): url = f'/depth?id={k}' try: links = await super().crawl(url, schema, always_by_pass_cache=True, bypass_cache=True, js_code=js_commands, wait_for=wait_for, js_only=False) except Exception as e: print(f'error {url}') links = [] if links: links = [ele['href'] for eles in links[0].values() for ele in eles if 'href' in ele] links = sorted(list(set(links)), key=lambda x: x) ({f'{}{ele}': v for ele in links}) return results async def crawl_newsletter(self, url, category): schema = { 'name': 'Caixin News Detail Page', 'baseSelector': '-left', 'fields': [ { 'name': 'title', 'selector': '-title-content', 'type': 'text' }, { 'name': 'time', 'selector': '-r-10', 'type': 'text' }, { 'name': 'abstract', 'selector': '-brief', 'type': 'text', 'fields': [ {'name': 'href', 'type': 'attribute', 'attribute':'href', 'selector': 'a[href]'} ] }, { 'name': 'contents', 'selector': '-content p', 'type': 'list', 'fields': [ {'name': 'content', 'type': 'text'} ] }, { 'name': 'read_number', 'selector': '-option-readnumber', 'type': 'text' } ] } wait_for = '-content' try: results = await super().crawl(url, schema, always_by_pass_cache=True, bypass_cache=True, wait_for=wait_for) result = results[0] except Exception as e: print(f'crawler error: {url}') return {} return { 'title': result['title'], 'abstract': result['abstract'], 'date': result['time'], 'link': url, 'content': '\n'.join([ele['content'] for ele in result['contents'] if 'content' in ele and ele['content']]), 'id': md5(url), 'type': category, 'read_number': await self.get_first_float_number(result['read_number'], r'[-+]?\d*\.\d+|\d+'), 'time': ().strftime('%Y-%m-%d') } async def get_first_float_number(self, text, pattern): match = (pattern, text) if match: return round(float(()), 4) return 0 async def crawl(self): link_2_category = await self.crawl_url_list() for link, category in link_2_category.items(): _id = md5(link) if _id in : continue news = await self.crawl_newsletter(link, category) if news: (news) return await self.get_last_day_data() if __name__ == '__main__': (CLSCrawler().crawl())