164 lines
6.3 KiB
Python
164 lines
6.3 KiB
Python
import re
|
|
import json
|
|
import requests
|
|
import datetime
|
|
|
|
rx_json_html = re.compile('window\._sharedData\s*=\s*(.*);\s*</script>')
|
|
#rx_json_html = re.compile('window\._sharedData\s*=\s*(.*);</script>')
|
|
old_date = datetime.datetime(1970, 1, 1, 9)
|
|
|
|
|
|
def get_json_from_html(content):
|
|
if type(content) == bytes:
|
|
s = content.decode('utf-8')
|
|
elif type(content) == str:
|
|
s = content
|
|
elif type(content) == requests.models.Response:
|
|
s = content.content.decode('utf-8')
|
|
else:
|
|
raise TypeError
|
|
m = rx_json_html.search(s)
|
|
|
|
if m:
|
|
#return json.dumps(json.loads(m.group(1)))
|
|
#return json.loads(json.dumps(m.group(1)))
|
|
return json.loads(m.group(1))
|
|
else:
|
|
raise TypeError("Check requests.response")
|
|
|
|
|
|
def parse_list_user_html(content):
|
|
json_data = get_json_from_html(content)
|
|
profilepage = json_data['entry_data']['ProfilePage']
|
|
|
|
has_next = False
|
|
end_cursor = None
|
|
body_list = []
|
|
user_id = None
|
|
|
|
if profilepage:
|
|
user_id = profilepage[0]["user"]["id"]
|
|
has_next = profilepage[0]["user"]["media"]["page_info"]["has_next_page"]
|
|
end_cursor = profilepage[0]["user"]["media"]["page_info"]["end_cursor"]
|
|
nodes = profilepage[0]["user"]["media"]["nodes"]
|
|
for node in nodes:
|
|
body_list.append(
|
|
{
|
|
"code": node["code"],
|
|
"url": "https://www.instagram.com/p/" + node["code"] + "/",
|
|
"date": old_date + datetime.timedelta(seconds=node["date"])
|
|
}
|
|
)
|
|
return body_list, end_cursor, has_next, user_id
|
|
|
|
|
|
def parse_list_tag_html(content):
|
|
json_data = get_json_from_html(content)
|
|
tagpage = json_data["entry_data"]["TagPage"]
|
|
|
|
has_next = False
|
|
end_cursor = None
|
|
body_list = []
|
|
if tagpage:
|
|
#print('start_cursor = ', end='', flush=True)
|
|
#print(tagpage[0]["tag"]["media"]["page_info"]["start_cursor"], flush=True) #start_cursor doesn't exsist
|
|
end_cursor = tagpage[0]["tag"]["media"]["page_info"]["end_cursor"]
|
|
has_next = tagpage[0]["tag"]["media"]["page_info"]["has_next_page"]
|
|
nodes = tagpage[0]["tag"]["media"]["nodes"]
|
|
for node in nodes:
|
|
body_list.append({
|
|
"code": node["code"],
|
|
"url": "https://www.instagram.com/p/" + node["code"] + "/",
|
|
"date": old_date + datetime.timedelta(seconds=node["date"])
|
|
})
|
|
return body_list, end_cursor, has_next
|
|
|
|
|
|
def parse_list_ajax(content):
|
|
json_data = json.loads(content.decode('utf-8'), encoding="utf-8")
|
|
has_next = False
|
|
end_cursor = None
|
|
body_list = []
|
|
|
|
if json_data["status"] == "ok":
|
|
has_next = json_data["media"]["page_info"]["has_next_page"]
|
|
end_cursor = json_data["media"]["page_info"]["end_cursor"]
|
|
nodes = json_data["media"]["nodes"]
|
|
for node in nodes:
|
|
body_list.append(
|
|
{
|
|
"code": node["code"],
|
|
"url": "https://www.instagram.com/p/" + node["code"] + "/",
|
|
"date": old_date + datetime.timedelta(seconds=node["date"])
|
|
}
|
|
)
|
|
return body_list, end_cursor, has_next
|
|
|
|
|
|
def parse_body_html(content):
|
|
json_data = get_json_from_html(content)
|
|
postpage = json_data["entry_data"]["PostPage"]
|
|
body = {}
|
|
reply = []
|
|
start_cursor = None
|
|
has_previous = False
|
|
if postpage:
|
|
media = postpage[0]["graphql"]["shortcode_media"]
|
|
body = {
|
|
"article_date": (old_date + datetime.timedelta(seconds=media["taken_at_timestamp"])).strftime("%Y-%m-%d %H:%M:%S"),
|
|
"article_data": media["edge_media_to_caption"]["edges"][0]["node"]["text"],
|
|
"article_id": media["owner"]["username"],
|
|
"article_nickname": media["owner"]["username"],
|
|
"platform_id": media["owner"]["username"],
|
|
"platform_name": "instagram",
|
|
"platform_form": "post",
|
|
"platform_title": media["owner"]["username"],
|
|
"article_form": "body",
|
|
"article_profileurl": media["owner"]["profile_pic_url"],
|
|
"article_order": str(media["edge_media_to_comment"]["count"]),
|
|
"article_hit": str(0),
|
|
"reply_url": str(media["edge_media_preview_like"]["count"])
|
|
}
|
|
comments = postpage[0]["graphql"]["shortcode_media"]["edge_media_to_comment"]
|
|
has_previous = comments["page_info"]["has_next_page"]
|
|
start_cursor = comments["page_info"]["end_cursor"]
|
|
nodes = comments["edges"]
|
|
for node in nodes:
|
|
reply.append({
|
|
"article_data": node["node"]["text"],
|
|
"article_date":
|
|
(old_date + datetime.timedelta(seconds=node["node"]["created_at"])).strftime("%Y-%m-%d %H:%M:%S"),
|
|
"article_id": node["node"]["owner"]["username"],
|
|
"article_nickname": node["node"]["owner"]["username"],
|
|
"article_profileurl": node["node"]["owner"]["profile_pic_url"],
|
|
"platform_name": "instagram",
|
|
"platform_form": "post",
|
|
"article_form": "reply"
|
|
})
|
|
return body, reply, start_cursor, has_previous
|
|
|
|
|
|
def parse_reply_ajax(content):
|
|
json_data = json.loads(content.decode('utf-8'), encoding="utf-8")
|
|
reply = []
|
|
start_cursor = None
|
|
has_previous = False
|
|
if json_data["status"] == "ok":
|
|
comments = json_data["comments"]
|
|
has_previous = comments["page_info"]["has_previous_page"]
|
|
start_cursor = comments["page_info"]["start_cursor"]
|
|
nodes = comments["nodes"]
|
|
for node in nodes:
|
|
reply.append({
|
|
"article_data": node["text"],
|
|
"article_date":
|
|
(old_date + datetime.timedelta(seconds=node["created_at"])).strftime("%Y-%m-%d %H:%M:%S"),
|
|
"article_id": node["user"]["username"],
|
|
"article_nickname": node["user"]["username"],
|
|
"article_profileurl": node["user"]["profile_pic_url"],
|
|
"platform_name": "instagram",
|
|
"platform_form": "post",
|
|
"article_form": "reply",
|
|
})
|
|
return reply, start_cursor, has_previous
|