import asyncio
from crawl4ai import AsyncWebCrawler
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
import json
from typing import Dict, Any, Union, List
import os
import datetime
import re
import hashlib
def md5(text):
m = hashlib.md5()
(('utf-8'))
return ()
def get_datas(file_path, json_flag=True, all_flag=False, mode='r'):
"""Reading text files"""
results = []
with open(file_path, mode, encoding='utf-8') as f:
for line in ():
if json_flag:
((line))
else:
(())
if all_flag:
if json_flag:
return (''.join(results))
else:
return '\n'.join(results)
return results
def save_datas(file_path, datas, json_flag=True, all_flag=False, with_indent=False, mode='w'):
"""Save text file"""
with open(file_path, mode, encoding='utf-8') as f:
if all_flag:
if json_flag:
((datas, ensure_ascii=False, indent= 4 if with_indent else None))
else:
(''.join(datas))
else:
for data in datas:
if json_flag:
((data, ensure_ascii=False) + '\n')
else:
(data + '\n')
class AbstractAICrawler():
def __init__(self) -> None:
pass
def crawl():
raise NotImplementedError()
class AINewsCrawler(AbstractAICrawler):
def __init__(self, domain) -> None:
super().__init__()
= domain
self.file_path = f'data/{}.json'
= ()
def init(self):
if not (self.file_path):
return {}
return {ele['id']: ele for ele in get_datas(self.file_path)}
def save(self, datas: Union[List, Dict]):
if isinstance(datas, dict):
datas = [datas]
({ele['id']: ele for ele in datas})
save_datas(self.file_path, datas=list(()))
async def crawl(self, url:str,
schema: Dict[str, Any]=None,
always_by_pass_cache=True,
bypass_cache=True,
headless=True,
verbose=False,
magic=True,
page_timeout=15000,
delay_before_return_html=2.0,
wait_for='',
js_code=None,
js_only=False,
screenshot=False,
headers={}):
extraction_strategy = JsonCssExtractionStrategy(schema, verbose=verbose) if schema else None
async with AsyncWebCrawler(verbose=verbose,
headless=headless,
always_by_pass_cache=always_by_pass_cache, headers=headers) as crawler:
result = await (
url=url,
extraction_strategy=extraction_strategy,
bypass_cache=bypass_cache,
page_timeout=page_timeout,
delay_before_return_html=delay_before_return_html,
wait_for=wait_for,
js_code=js_code,
magic=magic,
remove_overlay_elements=True,
process_iframes=True,
exclude_external_links=True,
js_only=js_only,
screenshot=screenshot
)
assert , "Failed to crawl the page"
if schema:
res = (result.extracted_content)
if screenshot:
return res,
return res
return
class FinanceNewsCrawler(AINewsCrawler):
def __init__(self, domain='') -> None:
super().__init__(domain)
def save(self, datas: Union[List, Dict]):
if isinstance(datas, dict):
datas = [datas]
({ele['id']: ele for ele in datas})
save_datas(self.file_path, datas=datas, mode='a')
async def get_last_day_data(self):
last_day = (() - (days=1)).strftime('%Y-%m-%d')
datas = ()
return [v for v in () if last_day in v['date']]
class CLSCrawler(FinanceNewsCrawler):
"""
Caixa News Grab
"""
def __init__(self) -> None:
= 'cls'
super().__init__()
= ''
async def crawl_url_list(self, url='/depth?id=1000'):
schema = {
'name': 'caijingwang toutiao page crawler',
'baseSelector': '-left',
'fields': [
{
'name': 'top_titles',
'selector': '-top-article-list',
'type': 'nested_list',
'fields': [
{'name': 'href', 'type': 'attribute', 'attribute':'href', 'selector': 'a[href]'}
]
},
{
'name': 'sec_titles',
'selector': '-top-article-list -l',
'type': 'nested_list',
'fields': [
{'name': 'href', 'type': 'attribute', 'attribute':'href', 'selector': 'a[href]'}
]
},
{
'name': 'bottom_titles',
'selector': '-t-1 ',
'type': 'nested_list',
'fields': [
{'name': 'href', 'type': 'attribute', 'attribute':'href', 'selector': 'a[href]'}
]
}
]
}
js_commands = [
"""
(async () => {{
await new Promise(resolve => setTimeout(resolve, 500));
const targetItemCount = 100;
let currentItemCount = ('-t-1 -w-b').length;
let loadMoreButton = ('.-button');
while (currentItemCount < targetItemCount) {{
(0, );
await new Promise(resolve => setTimeout(resolve, 1000));
if (loadMoreButton) {
();
} else {
('Load More button not found');
break;
}
await new Promise(resolve => setTimeout(resolve, 1000));
currentItemCount = ('-t-1 -w-b').length;
loadMoreButton = ('.-button');
}}
(`loaded ${currentItemCount} classifier for individual things or people, general, catch-all classifieritem`);
return currentItemCount;
}})();
"""
]
wait_for = ''
results = {}
menu_dict = {
'1000': 'lead story (on the news)',
'1003': 'A-share',
'1007': 'worldwide'
}
for k, v in menu_dict.items():
url = f'/depth?id={k}'
try:
links = await super().crawl(url, schema, always_by_pass_cache=True, bypass_cache=True, js_code=js_commands, wait_for=wait_for, js_only=False)
except Exception as e:
print(f'error {url}')
links = []
if links:
links = [ele['href'] for eles in links[0].values() for ele in eles if 'href' in ele]
links = sorted(list(set(links)), key=lambda x: x)
({f'{}{ele}': v for ele in links})
return results
async def crawl_newsletter(self, url, category):
schema = {
'name': 'Caixin News Detail Page',
'baseSelector': '-left',
'fields': [
{
'name': 'title',
'selector': '-title-content',
'type': 'text'
},
{
'name': 'time',
'selector': '-r-10',
'type': 'text'
},
{
'name': 'abstract',
'selector': '-brief',
'type': 'text',
'fields': [
{'name': 'href', 'type': 'attribute', 'attribute':'href', 'selector': 'a[href]'}
]
},
{
'name': 'contents',
'selector': '-content p',
'type': 'list',
'fields': [
{'name': 'content', 'type': 'text'}
]
},
{
'name': 'read_number',
'selector': '-option-readnumber',
'type': 'text'
}
]
}
wait_for = '-content'
try:
results = await super().crawl(url, schema, always_by_pass_cache=True, bypass_cache=True, wait_for=wait_for)
result = results[0]
except Exception as e:
print(f'crawler error: {url}')
return {}
return {
'title': result['title'],
'abstract': result['abstract'],
'date': result['time'],
'link': url,
'content': '\n'.join([ele['content'] for ele in result['contents'] if 'content' in ele and ele['content']]),
'id': md5(url),
'type': category,
'read_number': await self.get_first_float_number(result['read_number'], r'[-+]?\d*\.\d+|\d+'),
'time': ().strftime('%Y-%m-%d')
}
async def get_first_float_number(self, text, pattern):
match = (pattern, text)
if match:
return round(float(()), 4)
return 0
async def crawl(self):
link_2_category = await self.crawl_url_list()
for link, category in link_2_category.items():
_id = md5(link)
if _id in :
continue
news = await self.crawl_newsletter(link, category)
if news:
(news)
return await self.get_last_day_data()
if __name__ == '__main__':
(CLSCrawler().crawl())