【原创】instagram 批量下载工具
获取视频链接,导入功能
import json
import mysql.connector
from mysql.connector import Error
from datetime import datetime
import glob
import os
import time
import configparser
# MySQL 配置
# ---------- 读取配置 ----------
config = configparser.ConfigParser()
config.read('config.ini', encoding='utf-8')db_config = {'host': config.get('mysql', 'host'),'user': config.get('mysql', 'user'),'password': config.get('mysql', 'password'),'database': config.get('mysql', 'database'),'connect_timeout': 30
}# 加载单个 JSON 文件
def load_json_file(file_path):try:with open(file_path, 'r', encoding='utf-8') as file:data = json.load(file)if not isinstance(data, dict):print(f"文件 {file_path} 的内容不是有效的 JSON 对象")return {}return dataexcept json.JSONDecodeError as e:print(f"文件 {file_path} JSON 解析错误: {e}")return {}except Exception as e:print(f"加载 JSON 文件 {file_path} 失败: {e}")return {}# 提取 JSON 数据
def extract_post_data(json_data, file_path):try:data = json_data.get('data')if not data:print(f"文件 {file_path} 缺少 'data' 字段或为空")return []connection = data.get('xdt_api__v1__feed__user_timeline_graphql_connection')if not connection:print(f"文件 {file_path} 缺少 'xdt_api__v1__feed__user_timeline_graphql_connection' 字段或为空")return []edges = connection.get('edges', [])print(f"文件 {file_path} 中总共有 {len(edges)} 条数据")posts = []for i, edge in enumerate(edges):if not isinstance(edge, dict):print(f"文件 {file_path} 第 {i+1} 条数据 edge 不是字典,跳过")continuenode = edge.get('node')if not isinstance(node, dict):print(f"文件 {file_path} 第 {i+1} 条数据缺少有效 node,跳过")continuecaption = node.get('caption')if isinstance(caption, dict):created_at = caption.get('created_at', '')caption_text = caption.get('text', '')else:created_at = ''caption_text = ''if created_at:try:created_at = datetime.fromtimestamp(created_at).strftime('%Y-%m-%d %H:%M:%S')except Exception as e:print(f"文件 {file_path} 第 {i+1} 条数据时间戳错误: {e}")created_at = ''video_url = ''video_versions = node.get('video_versions', [])if isinstance(video_versions, list) and video_versions:video_url = video_versions[0].get('url', '')[:1000]else:print(f"文件 {file_path} 第 {i+1} 条数据缺少 video_versions 或为空")image_url = ''image_versions = node.get('image_versions2', {}).get('candidates', [])if isinstance(image_versions, list) and image_versions:image_url = image_versions[0].get('url', '')[:1000]else:print(f"文件 {file_path} 第 {i+1} 条数据缺少 image_versions2 或为空")post = {'code': node.get('code', ''),'caption': caption_text,'username': node.get('user', {}).get('username', ''),'profile_pic_url': node.get('user', {}).get('profile_pic_url', '')[:1000],'video_url': video_url,'image_url': image_url,'like_count': node.get('like_count', 0),'comment_count': node.get('comment_count', 0),'location': node.get('location', {}).get('name', '') if isinstance(node.get('location'), dict) else '','created_at': created_at,'ai_description': '','other_ai_caption': '','video_caption': caption_text,'is_downloaded': 0,'local_path': '','is_uploaded_other_platform': 0}if not post['code']:print(f"文件 {file_path} 第 {i+1} 条数据缺少 code,跳过: {post}")continueposts.append(post)print(f"文件 {file_path} 提取第 {i+1} 条数据: code={post['code']}")print(f"文件 {file_path} 成功提取 {len(posts)} 条有效数据")return postsexcept Exception as e:print(f"文件 {file_path} 提取数据时发生未知错误: {e}")return []# 插入数据到 MySQL
def insert_posts_to_mysql(posts):retries = 3for attempt in range(retries):try:connection = mysql.connector.connect(**db_config)cursor = connection.cursor()sql_insert = """INSERT INTO fa_instagram_posts (code, caption, username, profile_pic_url, video_url, image_url, like_count, comment_count, location, created_at, ai_description, other_ai_caption, video_caption, is_downloaded, local_path, is_uploaded_other_platform) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""sql_update = """UPDATE fa_instagram_posts SETcaption=%s, username=%s, profile_pic_url=%s, video_url=%s, image_url=%s, like_count=%s, comment_count=%s, location=%s, created_at=%s, ai_description=%s, other_ai_caption=%s, video_caption=%s, is_downloaded=%s, local_path=%s, is_uploaded_other_platform=%sWHERE code=%s"""success_count = 0for post in posts:values = (post['code'],post['caption'],post['username'],post['profile_pic_url'],post['video_url'],post['image_url'],post['like_count'],post['comment_count'],post['location'],post['created_at'],post['ai_description'],post['other_ai_caption'],post['video_caption'],post['is_downloaded'],post['local_path'],post['is_uploaded_other_platform'])try:cursor.execute(sql_insert, values)success_count += 1print(f"成功插入帖子 {post['code']}")except mysql.connector.errors.IntegrityError:print(f"帖子 {post['code']} 已存在,尝试更新")update_values = values[1:] + (post['code'],)cursor.execute(sql_update, update_values)success_count += 1print(f"成功更新帖子 {post['code']}")except mysql.connector.errors.DataError as de:print(f"数据错误 for 帖子 {post['code']}: {de}")print(f"数据详情: {values}")except Exception as e:print(f"其他错误 for 帖子 {post['code']}: {e}")print(f"数据详情: {values}")connection.commit()print(f"成功插入或更新 {success_count} 条记录(共尝试 {len(posts)} 条)")return success_countexcept Error as e:print(f"数据库错误 (尝试 {attempt+1}/{retries}): {e}")time.sleep(2)finally:if connection.is_connected():cursor.close()connection.close()print("达到最大重试次数,插入失败")return 0# 主函数:处理目录下的所有 JSON 文件
def import_all_json_files(directory):if not os.path.exists(directory):print(f"目录 {directory} 不存在")returnjson_files = glob.glob(os.path.join(directory, "*.json"))if not json_files:print(f"目录 {directory} 中没有 JSON 文件")returnprint(f"找到 {len(json_files)} 个 JSON 文件")total_success = 0for file_path in json_files:print(f"\n处理文件: {file_path}")json_data = load_json_file(file_path)if not json_data:continueposts = extract_post_data(json_data, file_path)if posts:success_count = insert_posts_to_mysql(posts)total_success += success_countelse:print(f"文件 {file_path} 无有效数据")print(f"\n导入完成,总共成功插入或更新 {total_success} 条记录")if __name__ == "__main__":# 指定 JSON 文件目录json_directory = "instagramdatafile"import_all_json_files(json_directory)
config.ini
[mysql]
host = 192.168.250.249
user = root
password = 123456
database = xin**ang_com[proxy]
enabled = true
http = http://127.0.0.1:1080
https = http://127.0.0.1:1080
下载代码
import os
import requests
import mysql.connector
from mysql.connector import Error
from datetime import datetime
import configparser
import logging
from tqdm import tqdm
# ---------- 日期和路径 ----------
today_str = datetime.now().strftime('%Y%m%d')
DOWNLOAD_DIR = os.path.join("downloadfile", today_str)
LOG_DIR = "logs"
LOG_FILE = os.path.join(LOG_DIR, f"download_{today_str}.log")
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)# ---------- 日志配置 ----------
logging.basicConfig(level=logging.INFO,format='[%(asctime)s] %(levelname)s: %(message)s',handlers=[logging.FileHandler(LOG_FILE, encoding='utf-8'),logging.StreamHandler()]
)# ---------- 读取配置 ----------
config = configparser.ConfigParser()
config.read('config.ini', encoding='utf-8')
db_config = {'host': config.get('mysql', 'host'),'user': config.get('mysql', 'user'),'password': config.get('mysql', 'password'),'database': config.get('mysql', 'database'),'connect_timeout': 30
}
# ---------- 读取代理配置 ----------
proxy_enabled = config.getboolean('proxy', 'enabled', fallback=False)
proxies = {"http": config.get('proxy', 'http', fallback=None),"https": config.get('proxy', 'https', fallback=None)
} if proxy_enabled else Nonedef download_file(url, save_path):try:with requests.get(url, stream=True, timeout=20, proxies=proxies) as response:response.raise_for_status()total = int(response.headers.get('content-length', 0))with open(save_path, 'wb') as file, tqdm(desc=os.path.basename(save_path),total=total,unit='B',unit_scale=True,unit_divisor=1024) as bar:for chunk in response.iter_content(chunk_size=8192):if chunk:file.write(chunk)bar.update(len(chunk))return Trueexcept Exception as e:logging.error(f"下载失败: {url},错误: {e}")return Falsedef download_from_mysql():try:connection = mysql.connector.connect(**db_config)cursor = connection.cursor(dictionary=True)cursor.execute("""SELECT id, code, video_url FROM fa_instagram_posts WHERE is_downloaded = 0 AND video_url IS NOT NULL AND video_url != ''LIMIT 100""")rows = cursor.fetchall()logging.info(f"共找到 {len(rows)} 条未下载记录")for idx, row in enumerate(rows, 1):code = row['code']url = row['video_url']save_name = f"{idx:02d}-{code}.mp4"save_path = os.path.join(DOWNLOAD_DIR, save_name)logging.info(f"[{idx}] 正在下载: {code}")if download_file(url, save_path):update_sql = """UPDATE fa_instagram_postsSET is_downloaded = 1, local_path = %sWHERE id = %s"""cursor.execute(update_sql, (save_path.replace("\\", "/"), row['id']))connection.commit()logging.info(f"[{idx}] 下载完成,已更新数据库: {save_path}")else:logging.warning(f"[{idx}] 下载失败,跳过: {code}")except Error as e:logging.error(f"MySQL 错误: {e}")finally:if connection.is_connected():cursor.close()connection.close()logging.info("数据库连接已关闭")if __name__ == "__main__":download_from_mysql()