-
This tool was originally designed to obtain information about WeChat accounts and parse the database of the PC version of WeChat.
-
Developed in Python, the program reads, decrypts and restores the WeChat database and helps the user to view the chat logs, as well as exporting the chat logs to csv, html and other formats for AI training, auto-replying or backups and so on. Below we'll dive into the various aspects of this tool and how it works.
-
This project is for learning and communication use only, strictly prohibited for commercial use or illegal way, any violation of laws and regulations, infringement of the legitimate rights and interests of others, have nothing to do with this project and its developers, the consequences of the behavior of the person to bear.
[Full demo tool download]
/?aid=23
Our previous post, TheHijacking WeChat chat logs and analyzing and restoring them -- merging the decrypted databases (III)We have already explained how to merge the decrypted WeChat database into a whole .db file, so this time we will introduce the structure of the WeChat database and the role of each database in detail.
Brief description of each database of WeChat PC
-
Description: for ... Overview of the decrypted contents of the individual files under /WeChat Files/wxid_xxxxxxxxx/Msg.
-
Unless otherwise specified, "chat log data" refers to data that is structurally identical or similar to the complete chat log data in the Multi folder.
I. Micro letter small program related
Data related to WeChat Small Program, including but not limited to:
-
You have used the applet RecentWxApp
-
Starred applets StarWxApp
-
Basic information about each applet WAContact
Not very useful, but you can see the name and icon of the applet you've used, as well as the applet's AppID
Second, enterprise wechat related
BizChat
Enterprise WeChat contact data, including but not limited to:
-
Enterprise WeChat session ChatInfo accessible in WeChat
-
A portion of the session's information, ChatSession (unconfirmed relationship to ChatInfo; the Content field in this is the most recent message, suspected to be used for caching content for display)
-
All corporate WeChat user identities involved in chats, including group chats UsrInfo
-
The enterprise WeChat identity MyUsrInfo that this WeChat account is bound to
-
Special Note: It has not been verified in detail whether the chats include chats initiated with Enterprise WeChat users using regular WeChat identities or only chats initiated with other Enterprise WeChat identities using Enterprise WeChat identities that are bound to regular WeChat.
BizChatMsg
-
Enterprise WeChat chat log data, including all the data of chatting with Enterprise WeChat.
-
As with BizChat, it's undetermined whether the scope involved is just Enterprise WeChat-Enterprise WeChat or both regular WeChat-Enterprise WeChat.
-
Also, the difference between the messages here and the real WeChat messages in the Multi folder is that there is no split database.
OpenIM Prefix
-
This is also the data of enterprise WeChat, including contacts, enterprise information, and messages with enterprise WeChat contacts.
-
This is the data of ordinary WeChat-Enterprise WeChat, the above biz prefix is Enterprise WeChat-Enterprise WeChat
-
This is not commonly used and there are no brand new data structures, so I won't go into further detail.
PublicMsg
- It looks like a notification message from Enterprise WeChat, which can be interpreted as an enterprise application message from Enterprise WeChat
III. Micro letter function related
Emotion
As the name suggests emoji related, including but not limited to the following:
-
CustomEmotion: as the name suggests, a GIF emoji manually uploaded by the user, includes a download link, but it seems to be encrypted (there's an aesKey field inside but I didn't test it)
-
EmotionDes1 and EmotionItem should be similar, haven't really looked into it.
-
EmotionPackageItem: a list of collections of emoticons added to the account (the kind you download from the store)
ps: not very useful, WeChat's MSG folder has url links to emoticons, you can directly network access to emoticons in chat logs.
Favorite
-
FavItems: list of favorite message entries
-
FavDataItem: collection specific data. Presumably the following two points can be identified
-
Even simply bookmarking a public post will have a corresponding record in the FavDataItem
-
For messages of the collected merge forwarding type, each message in the merge forwarding is a separate record in the FavDataItem
-
FavTags: added tags for favorite content
Misc
- There are two tables, BizContactHeadImg and ContactHeadImg1, which should be the individual avatars in binary format
Sns
Data related to the WeChat circle of friends:
-
FeedsV20: XML data for circles of friends
-
CommentV20: Record of Likes or Comments on Friend Circle
-
NotificationV7: Circle of Friends Notification
-
SnsConfigV20: some configuration information that can be read is that it has a background image of your circle of friends in it
-
SnsGroupInfoV5: Guess it's a visible or invisible list of the visible range of the old wechat friends circle
FTS (search)
- Databases prefixed with FTS are probably all related to Full-Text Search (that's the search box on WeChat)
FTSContact
There's a bunch of tables.
- FTSChatroom15_content and FTSContact15_content correspond to the message sessions (including public numbers, etc.) that will be displayed in WeChat and all the people that will appear in Contacts (sometimes not all contacts will appear in Chat). The information includes nicknames, note names and micro signals, and also matches the fields that WeChat supports searching.
FTSFavorite
Search the index of favorite content
- The naming scheme is similar to the one above
PS: For favorite content searching by text, the computer version does this by stitching everything together into a super long string. This is fine for text, links, etc., but for merged forwarded messages, the keyword search [image] comes up.
MultiSearchChatMsg
-
This database has a different prefix, but looking at the content and structure it should still be a search-related, searching for files in chat logs
-
Stores the filename and the chat it is in
-
However, there is a significant difference in the number of records between the FTSMsgSearch18_content and SessionAttachInfo tables, not sure which one is less or what.
HardLink (where the file is stored on disk)
- Pointing the filenames of documents/images/videos to the name of the folder where they are saved (e.g. 2023-04) is useful but not much.
Media
- ChatCRVoice and MediaInfo may be voice information
IV. MicroMsg (contact core)
A database that shouldn't be on par with categorization, but I think it's the most central to the analysis so far, so it's on its own.
AppInfo (table)
Some introductions to the software, guessing that it might be about the little tail of the forwarding source that certain forwards that jump directly from the mobile app to WeChat will come with
Biz Prefix
The content that is relevant to the public should be primarily relevant to the account itself.
What is certain is that the BizSessionNewFeeds table holds session information underneath the subscription's broad categories, including avatars, most recent tweets, and so on.
ChatInfo
Saves the last time each session in the Chat list was marked as read.
ChatRoom and ChatRoomInfo
Information about Storage Group Chat
-
ChatRoom: store the list of users (including the list of micro signals and the list of group nicknames) and individual group nicknames for each group chat.
-
ChatRoomInfo: information related to the group chat, mainly the content of the group announcement, not related to the members By the way, again, WeChat this position has a naming anomaly, other table prefixes are ChatRoom, and suddenly appeared a ChatroomTool
Contact
As the name suggests, contacts. Contacts here doesn't mean your friends though, but all the people you might see, and all the strangers in all the group chats in addition to your friends.
-
Contact: This table stores basic user information, including but not limited to micro-signal (strangers without friends can see it too!) ), nicknames, note names, set tags, etc., and even the pinyin of the various fields generated, probably for easy searching!
-
ContactHeadImgUrl: Headshot address
-
ContactLabel: Friend Label ID against Name
-
ExtraBuf: Stores location information, cell phone number, email, etc.
PatInfo
Saved a portion of my friend's pats for the suffix, but only a few of them, and I don't seem to remember so few pats that have shown up on my computer?
Session
The real "Chat" section shows a list of sessions, no more and no less, including special sessions like "collapsed group chats"; the information includes name, number of unread messages, last message, and so on.
TicketInfo
This table has over a hundred pieces of data in my case, but I really don't understand what it is
V. FTSMSG
The prefix FTS has been added - this stands for the index required for the search.
The main elements within it are two such tables:
-
FTSChatMsg2_content: contains three fields
-
docid: a number incremented from 1, equivalent to the ID of the current entry
-
c0content: search keywords (keywords entered in the WeChat search box can be searched by content contained in this field)
-
c1entityId: not clear what it is used for, may be validation related
-
FTSChatMsg2_MetaData
-
docid: corresponds to docid in FTSChatMsg2_content table
-
msgId: corresponds to the contents of the MSG database
-
entityId: corresponds to c1entityId in FTSChatMsg2_content table
-
type: may be the type of the message
-
The remaining fields are not yet clear
Specifically, this number 2 in the table name is a personal guess that it might be the version number of the current database format.
VI. MediaMSG (voice messaging)
This is where all voice messages are stored. The database has and only has one table, Media, containing three valid fields:
-
Key
-
Reserved0 corresponds to the MsgSvrID of the message in the MSG database.
-
Speech data in Buf silk format
VII. MSG (core database of chat logs)
The two main internal tables areMSG
cap (a poem)Name2ID
Name2ID
-
Name2ID
There is only one column in this table, and the content is formatted as a microsignal or group chat ID @chatroom -
The effect is to make certain fields in the MSG correspond to it. Although there is no ID column in the table, the fact is that MSG defaults to the ID of the first row being the number (numbered from 1).
MSG
-
localId: literally the local ID of the message, not yet found its function.
-
TalkerId: the ID of the room where the message is located (this information is a guess, see the StrTalker field for the reason for the guess), corresponding to the Name2ID.
-
MsgSvrID: guess Srv may be an abbreviation for Server, which stands for the ID of the message stored on the server side.
-
Type: message type, see Table 1 for comparison.
-
SubType: a subcategory of message types, not yet seen to be of practical use.
-
IsSender: if or not the message is sent by yourself, that is to say, mark whether the message is displayed on the left or right side of the dialog page, take the value of 0 or 1
-
CreateTime: a second timestamp of when the message was created. Further experimentation is needed to confirm which time node is marked by this time, and a personal guess of the rule is as follows:
-
Messages sent from this computer: the markers represent the moment when the send button was clicked for each message
-
Messages sent/received from other devices, from other users: the time when this message was received locally from the server is marked.
-
Sequence: order, although it looks like a millisecond timestamp it is not. It's
CreateTime
The end of the field is followed by a three-digit number consisting of, normally, 000, if in the presence of twoCreateTime
The last three digits are incremented for identical messages. Further confirmation is needed as to whether the non-repeating scope is within a session or across all sessions.CreateTime
The last three digits are incremented for identical messages. Further confirmation is needed as to whether the non-repeating scope is within a session or across all sessions. -
StatusEx, FlagEx, Status, MsgServerSeq, MsgSequence: these five fields have not been analyzed for valid information by individuals for the time being
-
StrTalker: the micro-signal of the message sender. As a special note, if you look at it from here, the above
TalkerId
The probability is that the field refers to the ID of the room where the message is located, not the sender ID, but of course it could be the same as theTalkerId
It is a duplicate content, which is to be confirmed. -
StrContent: data in string format. In particular, except for the text type of message, most other types of this field will be a piece of XML data to mark some relevant information. By parsing the XML, you can get more information, such as the width and height of the picture, the duration of the voice, and so on.
-
DisplayContent: for one shot, save the account information of the bidder and the bidder.
-
Reserved0~6: These fields have not yet been analyzed for valid information, and some fields are empty.
-
CompressContent: literally means compressed data, which actually means that the micro-trust does not want to exist in the StrContent data here (e.g. text messages with references, etc.; compressed using the lz4 compression algorithm)
-
BytesExtra: additional data in binary format
-
BytesTrans: this is a constant null field at the moment.
Table 1: Comparison of field values and meanings (possibly extendable to fields in other databases that also mark information about message type)
CategoryType | SubType | Corresponding type |
---|---|---|
1 | 0 | copies |
3 | 0 | photograph |
34 | 0 | colloquial (rather than literary) pronunciation of a Chinese character |
43 | 0 | video |
47 | 0 | Animated emoticons (third-party developed emoticons) |
49 | 1 | Similar to text messages but not the same, so far I've only seen one AliCloud invitation to register like this. Presumably it's the same as the 57 subclass |
49 | 5 | Card link with title, blurb, etc. in CompressContent and locally cached cover path in BytesExtra |
49 | 6 | file, CompressContent has the filename and download link (but won't read it), and BytesExtra has the path to the local store |
49 | 8 | GIF emoticons uploaded by users, CDN link in CompressContent, but doesn't seem to be directly accessible for downloading |
49 | 19 | Merge forwarded chats, detailed chats in CompressContent, cached images and videos in BytesExtra etc. |
49 | 33/36 | Shared applet with card information in CompressContent and cover cache location in BytesExtra |
49 | 57 | Text message with reference (in this type StrContent is empty and both sent and referenced contents are in CompressContent) |
49 | 63 | Video number live or live replay, etc. |
49 | 87 | Group Announcement |
49 | 88 | Video number live or live replay, etc. |
49 | 2000 | Transfer messages (including outgoing, incoming, unsolicited returns) |
49 | 2003 | Giveaway Bonus Cover |
10000 | 0 | System notifications (the kind of gray text that appears in the center) |
10000 | 4 | Take a shot. |
10000 | 8000 | System notifications (specifically including when you invite someone to join a group chat) |
See more at:
The structure and function of each database file of WeChat PC - Root_WeChat_PC_Directory_Format
Below is the code that implements the processing for each database:
# -*- coding: utf-8 -*-## -------------------------------------------------------------------------------# Name: # Description: # Author: Rainbow# Date: 2024/11/08# -------------------------------------------------------------------------------import importlibimport osimport sqlite3import time from .utils import db_logerfrom dbutils.pooled_db import PooledDB # import logging## db_loger = ("db_prepare") class DatabaseSingletonBase: # _singleton_instances = {} # Using a dictionary to store differentdb_pathCorresponding single instance examples _class_name = "DatabaseSingletonBase" _db_pool = {} # Using a dictionary to store differentdb_pathCorresponding Connection Pool # def __new__(cls, *args, **kwargs): # if cls._class_name not in cls._singleton_instances: # cls._singleton_instances[cls._class_name] = super().__new__(cls) # return cls._singleton_instances[cls._class_name] @classmethod def connect(cls, db_config): """ Connecting to the database,If adding other database connections,then override the method :param db_config: Database Configuration :return: connection pool """ if not db_config: raise ValueError("db_config Cannot be empty") db_key = db_config.get("key", "") db_type = db_config.get("type", "sqlite") if db_key in cls._db_pool and cls._db_pool[db_key] is not None: return cls._db_pool[db_key] if db_type == "sqlite": db_path = db_config.get("path", "") if not (db_path): raise FileNotFoundError(f"File does not exist: {db_path}") pool = PooledDB( creator=sqlite3, # utilization sqlite3 As a connection creator maxconnections=0, # connection pool最大连接数 mincached=4, # initialization,At least one free link created in the link pool,0Indicates not created maxusage=1, # 一个链接最多被重复utilization的次数,Noneexpress an unlimited number of blocking=True, # connection pool中如果没有可用连接后,Whether to block and wait。True,wait for;False,不wait for然后报错 ping=0, # ping The database determines if the service is normal database=db_path ) elif db_type == "mysql": mysql_config = { 'user': db_config['user'], 'host': db_config['host'], 'password': db_config['password'], 'database': db_config['database'], 'port': db_config['port'] } pool = PooledDB( creator=importlib.import_module('pymysql'), # utilization mysql As a connection creator ping=1, # ping The database determines if the service is normal **mysql_config ) else: raise ValueError(f"Unsupported database types: {db_type}") db_loger.info(f"{pool} Connection handle creation {db_config}") cls._db_pool[db_key] = pool return pool class DatabaseBase(DatabaseSingletonBase): _class_name = "DatabaseBase" existed_tables = [] def __init__(self, db_config): """ db_config = { "key": "test1", "type": "sqlite", "path": r"C:\***\wxdump_work\merge_all.db" } """ = db_config = () self.__get_existed_tables() def __get_existed_tables(self): sql = "SELECT tbl_name FROM sqlite_master WHERE type = 'table' and tbl_name!='sqlite_sequence';" existing_tables = (sql) if existing_tables: self.existed_tables = [row[0].lower() for row in existing_tables] return self.existed_tables else: return None def tables_exist(self, required_tables: str or list): """ Determine if the table needed for the class exists Check if all required tables exist in the database. Args: required_tables (list or str): A list of table names or a single table name string. Returns: bool: True if all required tables exist, False otherwise. """ if isinstance(required_tables, str): required_tables = [required_tables] rbool = all(() in self.existed_tables for table in (required_tables or [])) if not rbool: db_loger.warning(f"{required_tables=}\n{self.existed_tables=}\n{rbool=}") return rbool def execute(self, sql, params=None): """ fulfillmentSQLstatement :param sql: SQLstatement (str) :param params: parameters (tuple) :return: Inquiry results (list) """ connection = () try: # connection.text_factory = bytes cursor = () if params: (sql, params) else: (sql) return () except Exception as e1: try: connection.text_factory = bytes cursor = () if params: (sql, params) else: (sql) rdata = () connection.text_factory = str return rdata except Exception as e2: db_loger.error(f"{sql=}\n{params=}\n{e1=}\n{e2=}\n", exc_info=True) return None finally: () def close(self): () db_loger.info(f"Closing the database - {}") def __del__(self): () # class MsgDb(DatabaseBase):## def p(self, *args, **kwargs):# sel = "select tbl_name from sqlite_master where type='table'"# data = (sel)# # print([i[0] for i in data])# return data### class MsgDb1(DatabaseBase):# _class_name = "MsgDb1"## def p(self, *args, **kwargs):# sel = "select tbl_name from sqlite_master where type='table'"# data = (sel)# # print([i[0] for i in data])# return data### if __name__ == '__main__':# (level=,# style='{',# datefmt='%Y-%m-%d %H:%M:%S',# format='[{levelname[0]}] {asctime} [{name}:{levelno}] {pathname}:{lineno} {message}'# )## config1 = {# "key": "test1",# "type": "sqlite",# "path": r"D:\e_all.db"# }# config2 = {# "key": "test2",# "type": "sqlite",# "path": r"D:\_call.db"# }## t1 = MsgDb(config1)# ()# t2 = MsgDb(config2)# ()# t3 = MsgDb1(config1)# ()# t4 = MsgDb1(config2)# ()## print(t4._db_pool)# # destroy (by melting or burning)t1# del t1# # destroy (by melting or burning)t2# del t2# del t3## # destroy (by melting or burning)t4# del t4# import time# (1)## t1 = MsgDb(config1)# ()# t2 = MsgDb(config2)# ()### print(t2._db_pool)
# -*- coding: utf-8 -*-## -------------------------------------------------------------------------------# Name: # Description: responsible for handlingwxCollection database# Author: Rainbow# Date: 2024/11/08# -------------------------------------------------------------------------------from collections import defaultdict from .dbbase import DatabaseBasefrom .utils import timestamp2str, xml2dict # * FavItems:List of favorite message entries# * FavDataItem:Collection-specific data。The following two points can probably be identified# * Even simply bookmarking a public post on the FavDataItem There is a corresponding record in the# * For favorite merge forwarding types of messages,Each message in the merge forwarding is in the FavDataItem It's all a separate record in the# * FavTags:Tags added for favorite content class FavoriteHandler(DatabaseBase): _class_name = "Favorite" Favorite_required_tables = ["FavItems", "FavDataItem", "FavTagDatas", "FavBindTagDatas"] def get_tags(self, LocalID): """ return: {LocalID: TagName} """ if not self.tables_exist("FavTagDatas"): return {} if LocalID is None: sql = "select LocalID, TagName from FavTagDatas order by ServerSeq" else: sql = "select LocalID, TagName from FavTagDatas where LocalID = '%s' order by ServerSeq " % LocalID tags = (sql) # [(1, 797940830, 'programming language type'), (2, 806153863, 'billing')] # Convert to Dictionary tags = {tag[0]: tag[1] for tag in tags} return tags def get_FavBindTags(self): """ return: [(FavLocalID, TagName)] """ sql = ("select DISTINCT , " "from FavBindTagDatas A, FavTagDatas B where = ") FavBindTags = (sql) return FavBindTags def get_favorite(self): """ return: [{FavItemsFields}, {FavItemsFields}] """ FavItemsFields = { "FavLocalID": "Local FavoritesID", "SvrFavId": "Server CollectionID", "SourceId": "rootID", "Type": "typology", "SourceType": "roottypology", "LocalStatus": "local state", "Flag": "markings", "Status": "state of affairs", "FromUser": "root用户", "RealChatName": "Actual Chat Name", "SearchKey": "Search Keywords", "UpdateTime": "update time", "reseverd0": "reserved field0", "XmlBuf": "XMLbuffer" } FavDataItemFields = { "FavLocalID": "Local FavoritesID", "Type": "typology", "DataId": "digitalID", "HtmlId": "HTML ID", "Datasourceid": "digitalrootID", "Datastatus": "digitalstate of affairs", "Datafmt": "digital格式", "Datatitle": "digital标题", "Datadesc": "digital描述", "Thumbfullmd5": "Thumbnail FullMD5", "Thumbhead256md5": "thumbnail header256MD5", "Thumbfullsize": "Thumbnail Full尺寸", "fullmd5": "surname QuanMD5", "head256md5": "beginning or end256MD5", "fullsize": "surname Quan尺寸", "cdn_thumburl": "CDNthumbnailURL", "cdn_thumbkey": "CDNthumbnailKEY", "thumb_width": "thumbnail宽度", "thumb_height": "thumbnail高度", "cdn_dataurl": "CDNdigitalURL", "cdn_datakey": "CDNdigitalKEY", "cdn_encryver": "CDNencrypted version", "duration": "length of time", "stream_weburl": "streaming mediaWEB URL", "stream_dataurl": "streaming mediadigitalURL", "stream_lowbandurl": "streaming media低带宽URL", "sourcethumbpath": "rootthumbnail路径", "sourcedatapath": "rootdigital路径", "stream_videoid": "streaming mediavideoID", "Rerserved1": "Reserved fields1", "Rerserved2": "Reserved fields2", "Rerserved3": "Reserved fields3", "Rerserved4": "Reserved fields4", "Rerserved5": "Reserved fields5", "Rerserved6": "Reserved fields6", "Rerserved7": "Reserved fields7" } if not self.tables_exist(["FavItems", "FavDataItem"]): return False sql1 = "select " + ",".join(()) + " from FavItems order by UpdateTime desc" sql2 = "select " + ",".join(()) + " from FavDataItem B order by asc" FavItemsList = (sql1) FavDataItemList = (sql2) if FavItemsList is None or len(FavItemsList) == 0: return False FavDataDict = {} if FavDataItemList and len(FavDataItemList) >= 0: for item in FavDataItemList: data_dict = {} for i, key in enumerate(()): data_dict[key] = item[i] FavDataDict[item[0]] = (item[0], []) + [data_dict] # Get Tags FavTags = self.get_FavBindTags() FavTagsDict = {} for FavLocalID, TagName in FavTags: FavTagsDict[FavLocalID] = (FavLocalID, []) + [TagName] rdata = [] for item in FavItemsList: processed_item = { key: item[i] for i, key in enumerate(()) } processed_item['UpdateTime'] = timestamp2str(processed_item['UpdateTime']) processed_item['XmlBuf'] = xml2dict(processed_item['XmlBuf']) processed_item['TypeName'] = Favorite_type_converter(processed_item['Type']) processed_item['FavData'] = (processed_item['FavLocalID'], []) processed_item['Tags'] = (processed_item['FavLocalID'], []) (processed_item) try: import pandas as pd except ImportError: return False pf = (FavItemsList) = () # set column names pf["UpdateTime"] = pf["UpdateTime"].apply(timestamp2str) # processing time pf["XmlBuf"] = pf["XmlBuf"].apply(xml2dict) # deal withxml pf["TypeName"] = pf["Type"].apply(Favorite_type_converter) # 添加typologyname (of a thing)列 pf["FavData"] = pf["FavLocalID"].apply(lambda x: (x, [])) # 添加digital列 pf["Tags"] = pf["FavLocalID"].apply(lambda x: (x, [])) # Adding tabbed columns pf = ("") # get rid ofNan rdata = pf.to_dict(orient="records") return rdata def Favorite_type_converter(type_id_or_name: [str, int]): """ 收藏typologyIDwith name conversion name (of a thing)(str)=>ID(int) ID(int)=>name (of a thing)(str) :param type_id_or_name: 消息typologyID或name (of a thing) :return: 消息typologyname (of a thing)或ID """ type_name_dict = defaultdict(lambda: "uncharted", { 1: "copies", # copies tested 2: "photograph", # photograph tested 3: "colloquial (rather than literary) pronunciation of a Chinese character", # colloquial (rather than literary) pronunciation of a Chinese character 4: "video", # video tested 5: "link (on a website)", # link (on a website) tested 6: "placement", # placement 7: "applet", # applet 8: "file", # file tested 14: "chat log", # chat log tested 16: "群聊video", # 群聊中的video likelihood 18: "a type of literature consisting mainly of short sketches" # a type of literature consisting mainly of short sketches tested }) if isinstance(type_id_or_name, int): return type_name_dict[type_id_or_name] elif isinstance(type_id_or_name, str): return next((k for k, v in type_name_dict.items() if v == type_id_or_name), (0, 0)) else: raise ValueError("Invalid input type")
# -*- coding: utf-8 -*-## -------------------------------------------------------------------------------# Name: # Description: Responsible for handling the speech database# Author: Rainbow# Date: 2024/11/08# -------------------------------------------------------------------------------from .dbbase import DatabaseBasefrom .utils import silk2audio class MediaHandler(DatabaseBase): _class_name = "MediaMSG" Media_required_tables = ["Media"] def Media_add_index(self): """ add index, speed up query """ if self.tables_exist("Media"): ("CREATE INDEX IF NOT EXISTS MsgSvrID ON Media(Reserved0)") def get_ audio(self, MsgSvrID, is_play=False, is_wave=False, save_path=None, rate=24000): if not self.tables_exist("Media"): return False sql = "select Buf from Media where Reserved0=? " DBdata = (sql, (MsgSvrID,)) if not DBdata: return False if len(DBdata) == 0: return False data = DBdata[0][0] # [1:] + b'\xFF\xFF' try: pcm_data = silk2audio(buf_data=data, is_play=is_play, is_wave=is_wave, save_path=save_path, rate=rate) return pcm_data except Exception as e: return False
# -*- coding: utf-8 -*-## -------------------------------------------------------------------------------# Name: # Description: Responsible for handling the contacts database# Author: Rainbow# Date: 2024/11/08# -------------------------------------------------------------------------------import logging from .dbbase import DatabaseBasefrom .utils import timestamp2str, bytes2str, db_loger, db_error import blackboxprotobuf class MicroHandler(DatabaseBase): _class_name = "MicroMsg" Micro_required_tables = ["ContactLabel", "Contact", "ContactHeadImgUrl", "Session", "ChatInfo", "ChatRoom", "ChatRoomInfo"] def Micro_add_index(self): """ Add Index, Accelerated search speed """ # because of Session 表Add Index if self.tables_exist("Session"): ("CREATE INDEX IF NOT EXISTS idx_Session_strUsrName_nTime ON Session(strUsrName, nTime);") ("CREATE INDEX IF NOT EXISTS idx_Session_nOrder ON Session(nOrder);") ("CREATE INDEX IF NOT EXISTS idx_Session_nTime ON Session(nTime);") # because of Contact 表Add Index if self.tables_exist("Contact"): ("CREATE INDEX IF NOT EXISTS idx_Contact_UserName ON Contact(UserName);") # because of ContactHeadImgUrl 表Add Index if self.tables_exist('ContactHeadImgUrl'): ("CREATE INDEX IF NOT EXISTS idx_ContactHeadImgUrl_usrName ON ContactHeadImgUrl(usrName);") # because of ChatInfo 表Add Index if self.tables_exist('ChatInfo'): ("CREATE INDEX IF NOT EXISTS idx_ChatInfo_Username_LastReadedCreateTime " "ON ChatInfo(Username, LastReadedCreateTime);") ( "CREATE INDEX IF NOT EXISTS idx_ChatInfo_LastReadedCreateTime ON ChatInfo(LastReadedCreateTime);") # because of Contact Adding a Compound Index to a Table if self.tables_exist('Contact'): ("CREATE INDEX IF NOT EXISTS idx_Contact_search " "ON Contact(UserName, NickName, Remark, Alias, QuanPin, PYInitial, RemarkQuanPin, RemarkPYInitial);") # because of ChatRoom cap (a poem) ChatRoomInfo 表Add Index if self.tables_exist(['ChatRoomInfo', "ChatRoom"]): ("CREATE INDEX IF NOT EXISTS idx_ChatRoom_ChatRoomName ON ChatRoom(ChatRoomName);") ("CREATE INDEX IF NOT EXISTS idx_ChatRoomInfo_ChatRoomName ON ChatRoomInfo(ChatRoomName);") @db_error def get_labels(self, id_is_key=True): """ Read the list of tags :param id_is_key: id_is_key: True: id作because ofkey,False: name作because ofkey :return: """ labels = {} if not self.tables_exist("ContactLabel"): return labels sql = "SELECT LabelId, LabelName FROM ContactLabel ORDER BY LabelName ASC;" result = (sql) if not result: return labels if id_is_key: labels = {row[0]: row[1] for row in result} else: labels = {row[1]: row[0] for row in result} return labels @db_error def get_session_list(self): """ Get session list :return: session list """ sessions = {} if not self.tables_exist(["Session", "Contact", "ContactHeadImgUrl"]): return sessions sql = ( "SELECT ,,, , , , , " ", , , , S.Reserved2 AS nMsgSubType, , , " ", , , C.Reserved1, C.Reserved2, , , , " ", , C.Reserved5, C.Reserved6 as describe, , " "FROM (SELECT strUsrName, MAX(nTime) AS MaxnTime FROM Session GROUP BY strUsrName) AS SubQuery " "JOIN Session S ON = AND = " "left join Contact C ON = " "LEFT JOIN ContactHeadImgUrl H ON = " "WHERE !='@publicUser' " "ORDER BY DESC;" ) db_loger.info(f"get_session_list sql: {sql}") ret = (sql) if not ret: return sessions id2label = self.get_labels() for row in ret: (strUsrName, nOrder, nUnReadCount, strNickName, nStatus, nIsSend, strContent, nMsgLocalID, nMsgStatus, nTime, nMsgType, nMsgSubType, UserName, Alias, DelFlag, Type, VerifyFlag, Reserved1, Reserved2, Remark, NickName, LabelIDList, ChatRoomType, ChatRoomNotify, Reserved5, describe, ExtraBuf, bigHeadImgUrl) = row ExtraBuf = get_ExtraBuf(ExtraBuf) LabelIDList = (",") if LabelIDList else [] LabelIDList = [(int(label_id), label_id) for label_id in LabelIDList if label_id] nTime = timestamp2str(nTime) if nTime else None sessions[strUsrName] = { "wxid": strUsrName, "nOrder": nOrder, "nUnReadCount": nUnReadCount, "strNickName": strNickName, "nStatus": nStatus, "nIsSend": nIsSend, "strContent": strContent, "nMsgLocalID": nMsgLocalID, "nMsgStatus": nMsgStatus, "nTime": nTime, "nMsgType": nMsgType, "nMsgSubType": nMsgSubType, "LastReadedCreateTime": nTime, "nickname": NickName, "remark": Remark, "account": Alias, "describe": describe, "headImgUrl": bigHeadImgUrl if bigHeadImgUrl else "", "ExtraBuf": ExtraBuf, "LabelIDList": tuple(LabelIDList) } return sessions @db_error def get_recent_chat_wxid(self): """ Get recently chatted contacts :return: Recently Chatted Contacts """ users = [] if not self.tables_exist(["ChatInfo"]): return users sql = ( "SELECT , LastReadedCreateTime, LastReadedSvrId " "FROM ( SELECT Username, MAX(LastReadedCreateTime) AS MaxLastReadedCreateTime FROM ChatInfo " "WHERE LastReadedCreateTime IS NOT NULL AND LastReadedCreateTime > 1007911408000 GROUP BY Username " ") AS SubQuery JOIN ChatInfo A " "ON = AND LastReadedCreateTime = " "ORDER BY DESC;" ) db_loger.info(f"get_recent_chat_wxid sql: {sql}") result = (sql) if not result: return [] for row in result: # Get User Name、term of endearment、notecap (a poem)聊天记录数量 username, LastReadedCreateTime, LastReadedSvrId = row LastReadedCreateTime = timestamp2str(LastReadedCreateTime) if LastReadedCreateTime else None ( {"wxid": username, "LastReadedCreateTime": LastReadedCreateTime, "LastReadedSvrId": LastReadedSvrId}) return users @db_error def get_user_list(self, word: str = None, wxids: list = None, label_ids: list = None): """ Get contact list [ take note of:If you modify this function,To simultaneously modify theget_im_user_listfunction (math.) ] :param word: Search Keywords,can bewxid,user ID、term of endearment、note、descriptive,pinyin (Chinese romanization) :param wxids: wxidlistings :param label_ids: tab (of a window) (computing)id :return: Contact Dictionary """ if isinstance(wxids, str): wxids = [wxids] if isinstance(label_ids, str): label_ids = [label_ids] users = {} if not self.tables_exist(["Contact", "ContactHeadImgUrl"]): return users sql = ( "SELECT , , , , , A.Reserved1, A.Reserved2," ", , , , , A.Reserved5," "A.Reserved6 as describe, , " "FROM Contact A LEFT JOIN ContactHeadImgUrl B ON = WHERE 1==1 ;" ) if word: sql = (";", f"AND ( LIKE '%{word}%' " f"OR LIKE '%{word}%' " f"OR LIKE '%{word}%' " f"OR LIKE '%{word}%' " f"OR LOWER() LIKE LOWER('%{word}%') " f"OR LOWER() LIKE LOWER('%{word}%') " f"OR LOWER() LIKE LOWER('%{word}%') " f"OR LOWER() LIKE LOWER('%{word}%') " f") " ";") if wxids: sql = (";", f"AND IN ('" + "','".join(wxids) + "') ;") if label_ids: sql_label = [f" LIKE '%{i}%' " for i in label_ids] sql_label = " OR ".join(sql_label) sql = (";", f"AND ({sql_label}) ;") db_loger.info(f"get_user_list sql: {sql}") result = (sql) if not result: return users id2label = self.get_labels() for row in result: # gainwxid,term of endearment,note,descriptive,avatar,tab (of a window) (computing) (UserName, Alias, DelFlag, Type, VerifyFlag, Reserved1, Reserved2, Remark, NickName, LabelIDList, ChatRoomType, ChatRoomNotify, Reserved5, describe, ExtraBuf, bigHeadImgUrl) = row ExtraBuf = get_ExtraBuf(ExtraBuf) LabelIDList = (",") if LabelIDList else [] LabelIDList = [(int(label_id), label_id) for label_id in LabelIDList if label_id] # print(f"{UserName=}\n{Alias=}\n{DelFlag=}\n{Type=}\n{VerifyFlag=}\n{Reserved1=}\n{Reserved2=}\n" # f"{Remark=}\n{NickName=}\n{LabelIDList=}\n{ChatRoomType=}\n{ChatRoomNotify=}\n{Reserved5=}\n" # f"{describe=}\n{ExtraBuf=}\n{bigHeadImgUrl=}") users[UserName] = { "wxid": UserName, "nickname": NickName, "remark": Remark, "account": Alias, "describe": describe, "headImgUrl": bigHeadImgUrl if bigHeadImgUrl else "", "ExtraBuf": ExtraBuf, "LabelIDList": tuple(LabelIDList), "extra": None} extras = self.get_room_list(roomwxids=filter(lambda x: "@" in x, ())) for UserName in users: users[UserName]["extra"] = (UserName, None) return users @db_error def get_room_list(self, word=None, roomwxids: list = None): """ gaingroup chatlistings :param word: Group Chat Search Terms :param roomwxids: group chatwxidlistings :return: group chat字典 """ # grout comprehensive database,and execute the query if isinstance(roomwxids, str): roomwxids = [roomwxids] rooms = {} if not self.tables_exist(["ChatRoom", "ChatRoomInfo"]): return rooms sql = ( "SELECT ,,,,," ",A.Reserved2,, " ",, " "FROM ChatRoom A LEFT JOIN ChatRoomInfo B ON == " "WHERE 1==1 ;") if word: sql = (";", f"AND LIKE '%{word}%' ;") if roomwxids: sql = (";", f"AND IN ('" + "','".join(roomwxids) + "') ;") db_loger.info(f"get_room_list sql: {sql}") result = (sql) if not result: return rooms for row in result: # Get User Name、term of endearment、notecap (a poem)聊天记录数量 (ChatRoomName, UserNameList, DisplayNameList, ChatRoomFlag, IsShowName, SelfDisplayName, Reserved2, RoomData, Announcement, AnnouncementEditor, AnnouncementPublishTime) = row UserNameList = ("^G") DisplayNameList = ("^G") RoomData = ChatRoom_RoomData(RoomData) wxid2roomNickname = {} if RoomData: rd = [] for k, v in (): if isinstance(v, list): rd += v for i in rd: try: if isinstance(i, dict) and isinstance(('1'), str) and ('2'): wxid2roomNickname[i['1']] = i["2"] except Exception as e: db_loger.error(f"wxid2remark: ChatRoomName:{ChatRoomName}, {i} error:{e}", exc_info=True) wxid2userinfo = self.get_user_list(wxids=UserNameList) for i in wxid2userinfo: wxid2userinfo[i]["roomNickname"] = (i, "") owner = (Reserved2, Reserved2) rooms[ChatRoomName] = { "wxid": ChatRoomName, "roomWxids": UserNameList, "IsShowName": IsShowName, "ChatRoomFlag": ChatRoomFlag, "SelfDisplayName": SelfDisplayName, "owner": owner, "wxid2userinfo": wxid2userinfo, "Announcement": Announcement, "AnnouncementEditor": AnnouncementEditor, "AnnouncementPublishTime": AnnouncementPublishTime} return rooms @db_errordef ChatRoom_RoomData(RoomData): # retrievegroup chat数据,主要because of wxid,以及对应term of endearment if RoomData is None or not isinstance(RoomData, bytes): return None data = get_BytesExtra(RoomData) bytes2str(data) if data else None return data @db_errordef get_BytesExtra(BytesExtra): if BytesExtra is None or not isinstance(BytesExtra, bytes): return None try: deserialize_data, message_type = blackboxprotobuf.decode_message(BytesExtra) return deserialize_data except Exception as e: db_loger.warning(f"\nget_BytesExtra: {e}\n{BytesExtra}", exc_info=True) return None @db_errordef get_ExtraBuf(ExtraBuf: bytes): """ retrieveExtraBuf(contact form) :param ExtraBuf: :return: """ if not ExtraBuf: return None buf_dict = { '74752C06': 'distinguishing between the sexes[1male2daughter]', '46CF10C4': 'personalized signature', 'A4D9024A': 'country', 'E2EAA8D1': 'leave out', '1D025BBF': 'city', 'F917BCC0': 'company identification', '759378AD': 'cell phone number', '4EB96D85': 'Enterprise Micro Properties', '81AE19B4': 'Circle of Friends Background', '0E719F13': 'note图片', '945f3190': 'note图片2', 'DDF32683': '0', '88E28FCE': '1', '761A1D2D': '2', '0263A0CB': '3', '0451FF12': '4', '228C66A8': '5', '4D6C4570': '6', '4335DFDD': '7', 'DE4CDAEB': '8', 'A72BC20A': '9', '069FED52': '10', '9B0F4299': '11', '3D641E22': '12', '1249822C': '13', 'B4F73ACB': '14', '0959EB92': '15', '3CF4A315': '16', 'C9477AC60201E44CD0E8': '17', 'B7ACF0F5': '18', '57A7B5A8': '19', '695F3170': '20', 'FB083DD9': '21', '0240E37F': '22', '315D02A3': '23', '7DEC0BC3': '24', '16791C90': '25' } rdata = {} for buf_name in buf_dict: rdata_name = buf_dict[buf_name] buf_name = (buf_name) offset = (buf_name) if offset == -1: rdata[rdata_name] = "" continue offset += len(buf_name) type_id = ExtraBuf[offset: offset + 1] offset += 1 if type_id == b"\x04": rdata[rdata_name] = int.from_bytes(ExtraBuf[offset: offset + 4], "little") elif type_id == b"\x18": length = int.from_bytes(ExtraBuf[offset: offset + 4], "little") rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-16").rstrip("\x00") elif type_id == b"\x17": length = int.from_bytes(ExtraBuf[offset: offset + 4], "little") rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-8", errors="ignore").rstrip( "\x00") elif type_id == b"\x05": rdata[rdata_name] = f"0x{ExtraBuf[offset: offset + 8].hex()}" return rdata
# -*- coding: utf-8 -*-## -------------------------------------------------------------------------------# Name: # Description: Responsible for processing message database data# Author: Rainbow# Date: 2024/11/08# -------------------------------------------------------------------------------import jsonimport osimport reimport blackboxprotobuf from .dbbase import DatabaseBasefrom .utils import db_error, timestamp2str, xml2dict, match_BytesExtra, type_converter class MsgHandler(DatabaseBase): _class_name = "MSG" MSG_required_tables = ["MSG"] def Msg_add_index(self): """ Add Index,Accelerated search speed """ # Checking for the existence of an index if not self.tables_exist("MSG"): return ("CREATE INDEX IF NOT EXISTS idx_MSG_StrTalker ON MSG(StrTalker);") ("CREATE INDEX IF NOT EXISTS idx_MSG_CreateTime ON MSG(CreateTime);") ("CREATE INDEX IF NOT EXISTS idx_MSG_StrTalker_CreateTime ON MSG(StrTalker, CreateTime);") @db_error def get_m_msg_count(self, wxids: list = ""): """ Get the number of chat logs,according towxidGet the number of chats for a single contact,not passed onwxidThen get the number of chats for all contacts :param wxids: wxid list :return: Chat log count list {wxid: chat_count, total: total_count} """ if isinstance(wxids, str) and wxids: wxids = [wxids] if wxids: wxids = "('" + "','".join(wxids) + "')" sql = f"SELECT StrTalker, COUNT(*) FROM MSG WHERE StrTalker IN {wxids} GROUP BY StrTalker ORDER BY COUNT(*) DESC;" else: sql = f"SELECT StrTalker, COUNT(*) FROM MSG GROUP BY StrTalker ORDER BY COUNT(*) DESC;" sql_total = f"SELECT COUNT(*) FROM MSG;" if not self.tables_exist("MSG"): return {} result = (sql) total_ret = (sql_total) if not result: return {} total = 0 if total_ret and len(total_ret) > 0: total = total_ret[0][0] msg_count = {"total": total} msg_count.update({row[0]: row[1] for row in result}) return msg_count @db_error def get_msg_list(self, wxids: list or str = "", start_index=0, page_size=500, msg_type: str = "", msg_sub_type: str = "", start_createtime=None, end_createtime=None, my_talker="me"): """ Get the list of chat logs :param wxids: [wxid] :param start_index: Starting Index :param page_size: page size :param msg_type: Message Type :param msg_sub_type: Message subtype :param start_createtime: Starting time :param end_createtime: end time :param my_talker: me :return: Chat List {"id": _id, "MsgSvrID": str(MsgSvrID), "type_name": type_name, "is_sender": IsSender, "talker": talker, "room_name": StrTalker, "msg": msg, "src": src, "extra": {}, "CreateTime": CreateTime, } """ if not self.tables_exist("MSG"): return [], [] if isinstance(wxids, str) and wxids: wxids = [wxids] param = () sql_wxid, param = (f"AND StrTalker in ({', '.join('?' for _ in wxids)}) ", param + tuple(wxids)) if wxids else ("", param) sql_type, param = ("AND Type=? ", param + (msg_type,)) if msg_type else ("", param) sql_sub_type, param = ("AND SubType=? ", param + (msg_sub_type,)) if msg_type and msg_sub_type else ("", param) sql_start_createtime, param = ("AND CreateTime>=? ", param + (start_createtime,)) if start_createtime else ( "", param) sql_end_createtime, param = ("AND CreateTime<=? ", param + (end_createtime,)) if end_createtime else ("", param) sql = ( "SELECT localId,TalkerId,MsgSvrID,Type,SubType,CreateTime,IsSender,Sequence,StatusEx,FlagEx,Status," "MsgSequence,StrContent,MsgServerSeq,StrTalker,DisplayContent,Reserved0,Reserved1,Reserved3," "Reserved4,Reserved5,Reserved6,CompressContent,BytesExtra,BytesTrans,Reserved2," "ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id " "FROM MSG WHERE 1=1 " f"{sql_wxid}" f"{sql_type}" f"{sql_sub_type}" f"{sql_start_createtime}" f"{sql_end_createtime}" f"ORDER BY CreateTime ASC LIMIT ?,?" ) param = param + (start_index, page_size) result = (sql, param) if not result: return [], [] result_data = (self.get_msg_detail(row, my_talker=my_talker) for row in result) rdata = list(result_data) # Convert to list wxid_list = {d['talker'] for d in rdata} # Create a non-repeating wxid listings return rdata, list(wxid_list) @db_error def get_date_count(self, wxid='', start_time: int = 0, end_time: int = 0, time_format='%Y-%m-%d'): """ Get the number of daily chats,Including number of senders、Number of recipients and total number。 """ if not self.tables_exist("MSG"): return {} if isinstance(start_time, str) and start_time.isdigit(): start_time = int(start_time) if isinstance(end_time, str) and end_time.isdigit(): end_time = int(end_time) # if start_time or end_time is not an integer and not a float, set both to 0 if not (isinstance(start_time, (int, float)) and isinstance(end_time, (int, float))): start_time = 0 end_time = 0 params = () sql_wxid = "AND StrTalker = ? " if wxid else "" params = params + (wxid,) if wxid else params sql_time = "AND CreateTime BETWEEN ? AND ? " if start_time and end_time else "" params = params + (start_time, end_time) if start_time and end_time else params sql = (f"SELECT strftime('{time_format}', CreateTime, 'unixepoch', 'localtime') AS date, " " COUNT(*) AS total_count ," " SUM(CASE WHEN IsSender = 1 THEN 1 ELSE 0 END) AS sender_count, " " SUM(CASE WHEN IsSender = 0 THEN 1 ELSE 0 END) AS receiver_count " "FROM MSG " "WHERE StrTalker NOT LIKE '%chatroom%' " f"{sql_wxid} {sql_time} " f"GROUP BY date ORDER BY date ASC;") result = (sql, params) if not result: return {} # Convert query results to dictionary result_dict = {} for row in result: date, total_count, sender_count, receiver_count = row result_dict[date] = { "sender_count": sender_count, "receiver_count": receiver_count, "total_count": total_count } return result_dict @db_error def get_top_talker_count(self, top: int = 10, start_time: int = 0, end_time: int = 0): """ Get the number of chat logs最多的联系人,The number of their chats. """ if not self.tables_exist("MSG"): return {} if isinstance(start_time, str) and start_time.isdigit(): start_time = int(start_time) if isinstance(end_time, str) and end_time.isdigit(): end_time = int(end_time) # if start_time or end_time is not an integer and not a float, set both to 0 if not (isinstance(start_time, (int, float)) and isinstance(end_time, (int, float))): start_time = 0 end_time = 0 sql_time = f"AND CreateTime BETWEEN {start_time} AND {end_time} " if start_time and end_time else "" sql = ( "SELECT StrTalker, COUNT(*) AS count," "SUM(CASE WHEN IsSender = 1 THEN 1 ELSE 0 END) AS sender_count, " "SUM(CASE WHEN IsSender = 0 THEN 1 ELSE 0 END) AS receiver_count " "FROM MSG " "WHERE StrTalker NOT LIKE '%chatroom%' " f"{sql_time} " "GROUP BY StrTalker ORDER BY count DESC " f"LIMIT {top};" ) result = (sql) if not result: return {} # Convert query results to dictionary result_dict = {row[0]: {"total_count": row[1], "sender_count": row[2], "receiver_count": row[3]} for row in result} return result_dict # Single message processing @db_error def get_msg_detail(self, row, my_talker="me"): """ Get single message details,Formatted output """ (localId, TalkerId, MsgSvrID, Type, SubType, CreateTime, IsSender, Sequence, StatusEx, FlagEx, Status, MsgSequence, StrContent, MsgServerSeq, StrTalker, DisplayContent, Reserved0, Reserved1, Reserved3, Reserved4, Reserved5, Reserved6, CompressContent, BytesExtra, BytesTrans, Reserved2, _id) = row CreateTime = timestamp2str(CreateTime) type_id = (Type, SubType) type_name = type_converter(type_id) msg = StrContent src = "" extra = {} if type_id == (1, 0): # copies msg = StrContent elif type_id == (3, 0): # photograph DictExtra = get_BytesExtra(BytesExtra) DictExtra_str = str(DictExtra) img_paths = [i for i in (r"(FileStorage.*?)'", DictExtra_str)] img_paths = sorted(img_paths, key=lambda p: "Image" in p, reverse=True) if img_paths: img_path = img_paths[0].replace("'", "") img_path = [i for i in img_path.split("\\") if i] img_path = (*img_path) src = img_path else: src = "" msg = "photograph" elif type_id == (34, 0): # colloquial (rather than literary) pronunciation of a Chinese character tmp_c = xml2dict(StrContent) voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "") transtext = tmp_c.get("voicetrans", {}).get("transtext", "") if (): voicelength = int(voicelength) / 1000 voicelength = f"{voicelength:.2f}" msg = f"colloquial (rather than literary) pronunciation of a Chinese character时长:{voicelength}unit of angle or arc equivalent one sixtieth of a degree\nTranslation results:{transtext}" if transtext else f"colloquial (rather than literary) pronunciation of a Chinese character时长:{voicelength}unit of angle or arc equivalent one sixtieth of a degree" src = (f"{StrTalker}", f"{(':', '-').replace(' ', '_')}_{IsSender}_{MsgSvrID}.wav") elif type_id == (43, 0): # video DictExtra = get_BytesExtra(BytesExtra) DictExtra = str(DictExtra) DictExtra_str = str(DictExtra) video_paths = [i for i in (r"(FileStorage.*?)'", DictExtra_str)] video_paths = sorted(video_paths, key=lambda p: "mp4" in p, reverse=True) if video_paths: video_path = video_paths[0].replace("'", "") video_path = [i for i in video_path.split("\\") if i] video_path = (*video_path) src = video_path else: src = "" msg = "video" elif type_id == (47, 0): # animated expression content_tmp = xml2dict(StrContent) cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "") if not cdnurl: DictExtra = get_BytesExtra(BytesExtra) cdnurl = match_BytesExtra(DictExtra) if cdnurl: msg, src = "express one's feelings", cdnurl elif type_id == (48, 0): # Map Information content_tmp = xml2dict(StrContent) location = content_tmp.get("location", {}) msg = (f"longitude:【{('x')}】 longitudes:【{('y')}】\n" f"placement:{('label')} {('poiname')}\n" f"Other information:{(location, ensure_ascii=False, indent=4)}" ) src = "" elif type_id == (49, 0): # file DictExtra = get_BytesExtra(BytesExtra) url = match_BytesExtra(DictExtra) src = url file_name = (url) msg = file_name elif type_id == (49, 5): # (share (joys, benefits, privileges etc) with others)Carded Links CompressContent = decompress_CompressContent(CompressContent) CompressContent_tmp = xml2dict(CompressContent) appmsg = CompressContent_tmp.get("appmsg", {}) title = ("title", "") des = ("des", "") url = ("url", "") msg = f'{title}\n{des}\n\n<a href="{url}" target="_blank">Click for details</a>' src = url extra = appmsg elif type_id == (49, 19): # Merge forwarded chats CompressContent = decompress_CompressContent(CompressContent) content_tmp = xml2dict(CompressContent) title = content_tmp.get("appmsg", {}).get("title", "") des = content_tmp.get("appmsg", {}).get("des", "") recorditem = content_tmp.get("appmsg", {}).get("recorditem", "") recorditem = xml2dict(recorditem) msg = f"{title}\n{des}" src = recorditem elif type_id == (49, 57): # 带有references的copies消息 CompressContent = decompress_CompressContent(CompressContent) content_tmp = xml2dict(CompressContent) appmsg = content_tmp.get("appmsg", {}) title = ("title", "") refermsg = ("refermsg", {}) type_id = ("type", "1") displayname = ("displayname", "") display_content = ("content", "") display_createtime = ("createtime", "") display_createtime = timestamp2str( int(display_createtime)) if display_createtime.isdigit() else display_createtime if display_content and display_content.startswith("<?xml"): display_content = xml2dict(display_content) if "img" in display_content: display_content = "photograph" else: appmsg1 = display_content.get("appmsg", {}) title1 = ("title", "") display_content = title1 if title1 else display_content msg = f"{title}\n\n[references]({display_createtime}){displayname}:{display_content}" src = "" elif type_id == (49, 2000): # transfer message CompressContent = decompress_CompressContent(CompressContent) content_tmp = xml2dict(CompressContent) wcpayinfo = content_tmp.get("appmsg", {}).get("wcpayinfo", {}) paysubtype = ("paysubtype", "") # Type of transfer feedesc = ("feedesc", "") # Amount transferred pay_memo = ("pay_memo", "") # Remarks on transfers begintransfertime = ("begintransfertime", "") # transfer (money to a bank account)Starting time msg = (f"{'Received' if paysubtype == '3' else 'transfer (money to a bank account)'}:{feedesc}\n" f"transfer (money to a bank account)说明:{pay_memo if pay_memo else ''}\n" f"transfer (money to a bank account)时间:{timestamp2str(begintransfertime)}\n" ) src = "" elif type_id[0] == 49 and type_id[1] != 0: DictExtra = get_BytesExtra(BytesExtra) url = match_BytesExtra(DictExtra) src = url msg = type_name elif type_id == (50, 0): # colloquial (rather than literary) pronunciation of a Chinese character通话 msg = "colloquial (rather than literary) pronunciation of a Chinese character/video通话[%s]" % DisplayContent # elif type_id == (10000, 0): # msg = StrContent # elif type_id == (10000, 4): # msg = StrContent # elif type_id == (10000, 8000): # msg = StrContent talker = "uncharted" if IsSender == 1: talker = my_talker else: if ("@chatroom"): bytes_extra = get_BytesExtra(BytesExtra) if bytes_extra: try: talker = bytes_extra['3'][0]['2'] if "publisher-id" in talker: talker = "systems" except: pass else: talker = StrTalker row_data = {"id": _id, "MsgSvrID": str(MsgSvrID), "type_name": type_name, "is_sender": IsSender, "talker": talker, "room_name": StrTalker, "msg": msg, "src": src, "extra": extra, "CreateTime": CreateTime, } return row_data @db_errordef decompress_CompressContent(data): """ decompression (esp. computer)Msg:CompressContentelement :param data: CompressContentelement bytes :return: """ if data is None or not isinstance(data, bytes): return None try: dst = (data, uncompressed_size=len(data) << 8) dst = (b'\x00', b'') # After the decoding has been completed,also contains0x00share of,delete,or else back thereETErrors are reported when recognizing uncompressed_data = ('utf-8', errors='ignore') return uncompressed_data except Exception as e: return ('utf-8', errors='ignore') @db_errordef get_BytesExtra(BytesExtra): BytesExtra_message_type = { "1": { "type": "message", "message_typedef": { "1": { "type": "int", "name": "" }, "2": { "type": "int", "name": "" } }, "name": "1" }, "3": { "type": "message", "message_typedef": { "1": { "type": "int", "name": "" }, "2": { "type": "str", "name": "" } }, "name": "3", "alt_typedefs": { "1": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": {}, "name": "" } }, "2": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "13": { "type": "fixed32", "name": "" }, "12": { "type": "fixed32", "name": "" } }, "name": "" } }, "3": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "15": { "type": "fixed64", "name": "" } }, "name": "" } }, "4": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "15": { "type": "int", "name": "" }, "14": { "type": "fixed32", "name": "" } }, "name": "" } }, "5": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "12": { "type": "fixed32", "name": "" }, "7": { "type": "fixed64", "name": "" }, "6": { "type": "fixed64", "name": "" } }, "name": "" } }, "6": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "7": { "type": "fixed64", "name": "" }, "6": { "type": "fixed32", "name": "" } }, "name": "" } }, "7": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "12": { "type": "fixed64", "name": "" } }, "name": "" } }, "8": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "6": { "type": "fixed64", "name": "" }, "12": { "type": "fixed32", "name": "" } }, "name": "" } }, "9": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "15": { "type": "int", "name": "" }, "12": { "type": "fixed64", "name": "" }, "6": { "type": "int", "name": "" } }, "name": "" } }, "10": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "6": { "type": "fixed32", "name": "" }, "12": { "type": "fixed64", "name": "" } }, "name": "" } }, } } } if BytesExtra is None or not isinstance(BytesExtra, bytes): return None try: deserialize_data, message_type = blackboxprotobuf.decode_message(BytesExtra, BytesExtra_message_type) return deserialize_data except Exception as e: return None
# -*- coding: utf-8 -*-## -------------------------------------------------------------------------------# Name: # Description: # Author: Rainbow# Date: 2024/11/08# -------------------------------------------------------------------------------from .dbbase import DatabaseBasefrom .utils import db_error class OpenIMContactHandler(DatabaseBase): _class_name = "OpenIMContact" OpenIMContact_required_tables = ["OpenIMContact"] def get_im_user_list(self, word=None, wxids=None): """ Get contact list [ take note of:If you modify this function,To simultaneously modify theget_user_listfunction (math.) ] :param word: Search Keywords,Can be a username、term of endearment、note、descriptive,pinyin (Chinese romanization) :param wxids: microsoftidlistings :return: Contact Dictionary """ if not self.tables_exist("OpenIMContact"): return [] if not wxids: wxids = {} if isinstance(wxids, str): wxids = [wxids] sql = ("SELECT UserName,NickName,Type,Remark,BigHeadImgUrl,CustomInfoDetail,CustomInfoDetailVisible," "AntiSpamTicket,AppId,Sex,DescWordingId,ExtraBuf " "FROM OpenIMContact WHERE 1==1 ;") if word: sql = (";", f"AND (UserName LIKE '%{word}%' " f"OR NickName LIKE '%{word}%' " f"OR Remark LIKE '%{word}%' " f"OR LOWER(NickNamePYInit) LIKE LOWER('%{word}%') " f"OR LOWER(NickNameQuanPin) LIKE LOWER('%{word}%') " f"OR LOWER(RemarkPYInit) LIKE LOWER('%{word}%') " f"OR LOWER(RemarkQuanPin) LIKE LOWER('%{word}%') " ") ;") if wxids: sql = (";", f"AND UserName IN ('" + "','".join(wxids) + "') ;") result = (sql) if not result: return {} users = {} for row in result: # Get User Name、term of endearment、note和聊天记录数量 (UserName, NickName, Type, Remark, BigHeadImgUrl, CustomInfoDetail, CustomInfoDetailVisible, AntiSpamTicket, AppId, Sex, DescWordingId, ExtraBuf) = row users[UserName] = { "wxid": UserName, "nickname": NickName, "remark": Remark, "account": UserName, "describe": '', "headImgUrl": BigHeadImgUrl if BigHeadImgUrl else "", "ExtraBuf": None, "LabelIDList": tuple(), "extra": None} return users @db_errordef get_ExtraBuf(ExtraBuf: bytes): """ retrieveExtraBuf(contact form) :param ExtraBuf: :return: """ if not ExtraBuf: return None buf_dict = { '74752C06': 'distinguishing between the sexes[1male2women]', '46CF10C4': 'personalized signature', 'A4D9024A': 'country', 'E2EAA8D1': 'leave out', '1D025BBF': 'city', 'F917BCC0': 'company identification', '759378AD': 'cell phone number', '4EB96D85': 'Enterprise Micro Properties', '81AE19B4': 'Circle of Friends Background', '0E719F13': 'note图片', '945f3190': 'note图片2', 'DDF32683': '0', '88E28FCE': '1', '761A1D2D': '2', '0263A0CB': '3', '0451FF12': '4', '228C66A8': '5', '4D6C4570': '6', '4335DFDD': '7', 'DE4CDAEB': '8', 'A72BC20A': '9', '069FED52': '10', '9B0F4299': '11', '3D641E22': '12', '1249822C': '13', 'B4F73ACB': '14', '0959EB92': '15', '3CF4A315': '16', 'C9477AC60201E44CD0E8': '17', 'B7ACF0F5': '18', '57A7B5A8': '19', '695F3170': '20', 'FB083DD9': '21', '0240E37F': '22', '315D02A3': '23', '7DEC0BC3': '24', '16791C90': '25' } rdata = {} for buf_name in buf_dict: rdata_name = buf_dict[buf_name] buf_name = (buf_name) offset = (buf_name) if offset == -1: rdata[rdata_name] = "" continue offset += len(buf_name) type_id = ExtraBuf[offset: offset + 1] offset += 1 if type_id == b"\x04": rdata[rdata_name] = int.from_bytes(ExtraBuf[offset: offset + 4], "little") elif type_id == b"\x18": length = int.from_bytes(ExtraBuf[offset: offset + 4], "little") rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-16").rstrip("\x00") elif type_id == b"\x17": length = int.from_bytes(ExtraBuf[offset: offset + 4], "little") rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-8", errors="ignore").rstrip( "\x00") elif type_id == b"\x05": rdata[rdata_name] = f"0x{ExtraBuf[offset: offset + 8].hex()}" return rdata
# -*- coding: utf-8 -*-## -------------------------------------------------------------------------------# Name: # Description: Responsible for handling the speech database# Author: Rainbow# Date: 2024/11/08# -------------------------------------------------------------------------------from .dbbase import DatabaseBasefrom .utils import silk2audio, db_loger class OpenIMMediaHandler(DatabaseBase): _class_name = "OpenIMMedia" OpenIMMedia_required _tables = ["OpenIMMedia"] def get_im_audio(self, MsgSvrID, is_play=False, is_wave=False, save_path=None, rate=24000): if not self.tables_exist ("OpenIMMedia"): return False sql = "select Buf from OpenIMMedia where Reserved0=? " DBdata = (sql, (MsgSvrID,)) if not DBdata: return False if len(DBdata) == 0: return False data = DBdata[0][0] # [1:] + b'\xFF\xFF' try: pcm_data = silk2audio(buf_data=data, is_play=is_play, is_wave=is_wave, save_path=save_path, rate=rate) return pcm_data except Exception as e: db_loger. warning(e, exc_info=True) return False
# -*- coding: utf-8 -*-## -------------------------------------------------------------------------------# Name: # Description: Responsible for handling public database information# Author: Rainbow# Date: 2024/11/08# -------------------------------------------------------------------------------from .dbMSG import MsgHandlerfrom .utils import db_error class PublicMsgHandler(MsgHandler): _class_name = "PublicMSG" PublicMSG_required_tables = ["PublicMsg"] def PublicMsg_add_index(self): """ Add Index,Accelerated search speed """ # Checking for the existence of an index if not self.tables_exist("PublicMsg"): return sql = "CREATE INDEX IF NOT EXISTS idx_PublicMsg_StrTalker ON PublicMsg(StrTalker);" (sql) sql = "CREATE INDEX IF NOT EXISTS idx_PublicMsg_CreateTime ON PublicMsg(CreateTime);" (sql) sql = "CREATE INDEX IF NOT EXISTS idx_PublicMsg_StrTalker_CreateTime ON PublicMsg(StrTalker, CreateTime);" (sql) @db_error def get_plc_msg_count(self, wxids: list = ""): """ Get the number of chat logs,according towxidGet the number of chats for a single contact,not passed onwxidThen get the number of chats for all contacts :param wxids: wxid list :return: Chat log count list {wxid: chat_count} """ if not self.tables_exist("PublicMsg"): return {} if isinstance(wxids, str) and wxids: wxids = [wxids] if wxids: wxids = "('" + "','".join(wxids) + "')" sql = f"SELECT StrTalker, COUNT(*) FROM PublicMsg WHERE StrTalker IN {wxids} GROUP BY StrTalker ORDER BY COUNT(*) DESC;" else: sql = f"SELECT StrTalker, COUNT(*) FROM PublicMsg GROUP BY StrTalker ORDER BY COUNT(*) DESC;" sql_total = f"SELECT COUNT(*) FROM MSG;" result = (sql) total_ret = (sql_total) if not result: return {} total = 0 if total_ret and len(total_ret) > 0: total = total_ret[0][0] msg_count = {"total": total} msg_count.update({row[0]: row[1] for row in result}) return msg_count @db_error def get_plc_msg_list(self, wxids: list or str = "", start_index=0, page_size=500, msg_type: str = "", msg_sub_type: str = "", start_createtime=None, end_createtime=None, my_talker="me"): """ Get the list of chat logs :param wxids: [wxid] :param start_index: Starting Index :param page_size: page size :param msg_type: Message Type :param msg_sub_type: Message subtype :param start_createtime: Starting time :param end_createtime: end time :return: Chat List {"id": _id, "MsgSvrID": str(MsgSvrID), "type_name": type_name, "is_sender": IsSender, "talker": talker, "room_name": StrTalker, "msg": msg, "src": src, "extra": {}, "CreateTime": CreateTime, } """ if not self.tables_exist("PublicMsg"): return [], [] if isinstance(wxids, str) and wxids: wxids = [wxids] param = () sql_wxid, param = (f"AND StrTalker in ({', '.join('?' for _ in wxids)}) ", param + tuple(wxids)) if wxids else ("", param) sql_type, param = ("AND Type=? ", param + (msg_type,)) if msg_type else ("", param) sql_sub_type, param = ("AND SubType=? ", param + (msg_sub_type,)) if msg_type and msg_sub_type else ("", param) sql_start_createtime, param = ("AND CreateTime>=? ", param + (start_createtime,)) if start_createtime else ( "", param) sql_end_createtime, param = ("AND CreateTime<=? ", param + (end_createtime,)) if end_createtime else ("", param) sql = ( "SELECT localId,TalkerId,MsgSvrID,Type,SubType,CreateTime,IsSender,Sequence,StatusEx,FlagEx,Status," "MsgSequence,StrContent,MsgServerSeq,StrTalker,DisplayContent,Reserved0,Reserved1,Reserved3," "Reserved4,Reserved5,Reserved6,CompressContent,BytesExtra,BytesTrans,Reserved2," "ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id " "FROM PublicMsg WHERE 1=1 " f"{sql_wxid}" f"{sql_type}" f"{sql_sub_type}" f"{sql_start_createtime}" f"{sql_end_createtime}" f"ORDER BY CreateTime ASC LIMIT ?,?" ) param = param + (start_index, page_size) result = (sql, param) if not result: return [], [] result_data = (self.get_msg_detail(row, my_talker=my_talker) for row in result) rdata = list(result_data) # Convert to list wxid_list = {d['talker'] for d in rdata} # Create a non-repeating wxid listings return rdata, list(wxid_list)
# -*- coding: utf-8 -*-## -------------------------------------------------------------------------------# Name: # Description: Responsible for processing the data related to friend circle. The software can only see the records of the friend circle that have been viewed on the computer WeChat# Author: Rainbow# Date: 2024/11/08# ------------------------------------------------------------------------------ -import json from .dbbase import DatabaseBasefrom .utils import silk2audio, xml2dict, timestamp2str # FeedsV20: XML data of the circle of friends # CommentV20: Records of the circle of friends' likes or comments. # NotificationV7: Circle of Friends notification # SnsConfigV20: some configuration information that can be read with your Circle of Friends background image in it # SnsGroupInfoV5: guess it's a visible or invisible list of the visible range of the old WeChat Circle of Friends class SnsHandler(DatabaseBase): _ class_name = "Sns" Media_required_tables = ["AdFeedsV8", "FeedsV20", "CommentV20", "NotificationV7", "SnsConfigV20", "SnsFailureV5", " uGxMq1C4wvppcjBbyweK796GtT1hH3LGISYajZ2v7C11XhHk5icyDUXcWNSPk2MooeIa8Es5hXP0/0?idx=1&token= WSEN6qDsKwV8A02w3onOGQYfxnkibdqSOkmHhZGNB4DFumlE9p1vp0e0xjHoXlbbXRzwnQia6X5t3Anc4oqTuDg """ sql = ( "SELECT FeedId, CreateTime, FaultId. Type, UserName, Status, ExtFlag, PrivFlag, StringId, Content " "FROM FeedsV20 " "ORDER BY CreateTime DESC") FeedsV20 = (sql) for row in FeedsV20[2:]: ( FeedId, CreateTime, FaultId, Type, UserName, Status, ExtFlag, PrivFlag, StringId, Content) = row Content = xml2dict(Content) if Content and ("<") else Content CreateTime = timestamp2str(CreateTime) print( f"{FeedId=}\n" f"{CreateTime=}\n" f"{FaultId=}\n" f"{Type=}\n" f"{UserName=}\n" f" {Status=}\n" f"{ExtFlag=}\n" f"{PrivFlag=}\n" f"{StringId=}\n\n" f"{(Content, indent=4, ensure_ascii=False)}\n\n" ) return FeedId, CreateTime, FaultId, Type, UserName, Status, ExtFlag, PrivFlag, StringId, Content def get_sns_comment(self): pass