from crawl4ai import AsyncWebCrawler
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
import json
from typing import Dict, Any, Union, List
from bs4 import BeautifulSoup
from file_util import *
import os
import datetime
import re
import requests
class AbstractAICrawler():
def __init__(self) -> None:
pass
def crawl():
raise NotImplementedError()
class AINewsCrawler(AbstractAICrawler):
def __init__(self, domain) -> None:
super().__init__()
= domain
self.file_path = f'data/{}.json'
= ()
def init(self):
if not (self.file_path):
return {}
return {ele['id']: ele for ele in get_datas(self.file_path)}
def save(self, datas: Union[List, Dict]):
if isinstance(datas, dict):
datas = [datas]
({ele['id']: ele for ele in datas})
save_datas(self.file_path, datas=list(()))
async def crawl(self, url:str, schema: Dict[str, Any]=None):
extraction_strategy = JsonCssExtractionStrategy(schema, verbose=True) if schema else None
async with AsyncWebCrawler(verbose=True) as crawler:
result = await (
url=url,
extraction_strategy=extraction_strategy,
bypass_cache=True,
)
assert , "Failed to crawl the page"
if schema:
return (result.extracted_content)
return result.cleaned_html
class AIBasesCrawler(AINewsCrawler):
def __init__(self) -> None:
= 'aibase'
super().__init__()
= ''
async def crawl_home(self, url='/news'):
schema = {
'name': 'ai base home page crawler',
'baseSelector': '.flex',
'fields': [
{
'name': 'link',
'selector': 'a[rel="noopener noreferrer"]',
'type': 'nested_list',
'fields': [
{'name': 'href', 'type': 'attribute', 'attribute':'href'}
]
}
]
}
links = await super().crawl(url, schema)
links = [link['href'] for ele in links for link in ele['link']]
links = list(set([f'{}{ele}' for ele in links if ('/news')]))
links = sorted(links, key=lambda x: x, reverse=True)
return links
async def crawl_newsletter_cn(self, url):
html = await super().crawl(url)
body = BeautifulSoup(html, '')
title = body.select_one('h1').get_text().replace('\u200b', '').strip()
date = [ele.get_text().strip() for ele in body.find_all('span') if (r'(\d{4}year\d{1,2}month\d{1,2}number)', ele.get_text().strip())][0]
date = (date, '%Y year %m month %d number %H:%M').strftime("%Y-%m-%d")
content = '\n'.join([ele.get_text().strip().replace('\n', '').replace(' ', '') for ele in body.find_all('p')])
content = content[:('Highlights.')].strip() if 'Highlights.' in content else content
return {
'title': title,
'link': url,
'content': content,
'date': date
}
async def crawl_home_cn(self, url='/zh/news'):
schema = {
'name': 'ai base home page crawler',
'baseSelector': '.flex',
'fields': [
{
'name': 'link',
'selector': 'a[rel="noopener noreferrer"]',
'type': 'nested_list',
'fields': [
{'name': 'href', 'type': 'attribute', 'attribute':'href'}
]
}
]
}
links = await super().crawl(url, schema)
links = [link['href'] for ele in links for link in ele['link']]
links = list(set([f'{}{ele}' for ele in links if ('/zh/news')]))
links = sorted(links, key=lambda x: x, reverse=True)
return links
async def crawl_newsletter(self, url):
html = await super().crawl(url)
body = BeautifulSoup(html, '')
title = body.select_one('h1').get_text().replace('\u200b', '').strip()
date = ';'.join([ele.get_text().strip() for ele in body.find_all('span')])
date = (r'(\b\w{3}\s+\d{1,2},\s+\d{4}\b)', date)[0]
date = (date, '%b %d, %Y').strftime("%Y-%m-%d")
content = '\n'.join([ele.get_text().strip().replace('\n', '') for ele in body.find_all('p')])
content = content[:('Key Points:')].strip() if 'Key Points:' in content else content
pic_urls = [('src').strip() for ele in ('img') if ('title')]
pic_url = pic_urls[0] if pic_urls else ''
pic_url = pic_url.replace('\\"', '')
pic_path = ''
if pic_url:
pic_path = f'data/images/{md5(url)}.jpg'
response = (pic_url)
if response.status_code == 200:
with open(pic_path, 'wb') as f:
()
return {
'title': title,
'link': url,
'content': content,
'date': date,
'pic': pic_path,
'id': md5(url)
}
async def crawl(self):
links = await self.crawl_home()
results = []
for link in links:
_id = md5(link)
if _id in :
continue
({
'id': _id,
'link': link,
'contents': await self.crawl_newsletter(link),
'time': ().strftime('%Y-%m-%d')
})
(results)
return await self.get_last_day_data()
async def get_last_day_data(self):
last_day = (() - (days=1)).strftime('%Y-%m-%d')
datas = ()
for v in ():
v['contents']['id'] = v['id']
return [v['contents'] for v in () if v['contents']['date'] == last_day]