import requests import os import sys from time import gmtime, strftime import crython #SSL Cert Verification # Requests verifies SSL certificates for HTTPS requests, just like a web browser. # By default, SSL verification is enabled, and Requests will throw a SSLError # if it’s unable to verify the certificate. def print_zone(x): return f"{{name: {x['name']} , id: {x['id']}}}" def print_dns_record(x): return f"{{name: {x['name']} , id: {x['id']} , type: {x['type']}}}" def get_zones(auth_token): def zone_to_id_name(x): return { 'id': x['id'], 'name': x['name'] } api_url = "https://api.cloudflare.com/client/v4/zones" response = requests.get(api_url, headers={ 'Content-Type': 'application/json', 'Authorization': f'Bearer {auth_token}' }) response_json = response.json() if response_json['success'] == False: raise "" zones = list(map(zone_to_id_name, response_json['result'])) print('Zones: ') print('\n'.join(map(print_zone, zones))) active_zones = list(map(zone_to_id_name,filter(lambda x: x['status'] == 'active',response_json['result']))) print('Active zones: ') print('\n'.join(map(print_zone, active_zones))) return (zones, active_zones) def get_dns_records(auth_token, active_zones): def dns_record_to_id_name_type(x): return { 'id': x['id'], 'name': x['name'], 'zone_id': x['zone_id'], 'zone_name': x['zone_name'], 'type': x['type'] } dns_a_records: list = [] dns_records: list = [] for active_zone in active_zones: api_url = f"https://api.cloudflare.com/client/v4/zones/{active_zone['id']}/dns_records" response = requests.get(api_url, headers={ 'Content-Type': 'application/json', 'Authorization': f'Bearer {auth_token}' }) response_json = response.json() if response_json['success'] == False: raise "" dns_records.extend(list(map(dns_record_to_id_name_type, response_json['result']))) print('Dns records: ') print('\n'.join(map(print_dns_record, dns_records))) dns_a_records.extend(list(map(dns_record_to_id_name_type,filter(lambda x: x['type'] == 'A',response_json['result'])))) print('Dns records A: ') print('\n'.join(map(print_dns_record, dns_a_records))) return (dns_records, dns_a_records) # Crython supports seven fields # (seconds, minutes, hours, day of month, month, weekday, year). # https://crontab.guru/#*/5_*_*_*_* old_ip: str = '0.0.0.0' @crython.job(expr='0 */5 * * * * *') def main(): TOKEN = os.environ.get('CLOUDFLARE_TOKEN') DNS = os.environ.get('ZONES') DNS_A = os.environ.get('DNS_A_RECORDS') DNS = DNS.split(',') DNS_A.split(',') current_time = strftime("%d %b, %Y %H:%M:%S", gmtime()) print(f"{current_time}: EXECUTION STARTED") my_public_ip = requests.get('https://ifconfig.me').text if old_ip == my_public_ip: print("NOTHING TO UPDATE") sys.exit(0) print(f'PUBLIC IP: {my_public_ip}') print("Zones:") (zones, active_zones) = get_zones(TOKEN) active_zones = list(filter(lambda x: x['name'] in DNS, active_zones)) print("Active zones to update:") print('\n'.join(map(print_zone,active_zones))) (dns_records, dns_a_records) = get_dns_records(TOKEN, active_zones) print(dns_a_records) for dns_a_record in dns_a_records: # TOOD api_url = f"https://api.cloudflare.com/client/v4/zones/{dns_a_record['zone_id']}/dns_records/{dns_a_record['id']}" response = requests.patch(api_url, headers={ 'Content-Type': 'application/json', 'Authorization': f'Bearer {TOKEN}' }, data= { 'content': my_public_ip, 'type': 'A' }) response_json = response.json() if response_json['success'] == False: raise "" print(f"Successfuly chcanged IP address of {dns_a_record['name']}") print(response_json) current_time = strftime("%d %b, %Y %H:%M:%S", gmtime()) old_ip = my_public_ip print(f"{current_time} FINISHED EXECUTION SUCCESSFULLY") # api_url = f"https://api.cloudflare.com/client/v4/zones/{dns_a_record['zone_id']}/dns_records/{dns_a_record['id']}" # response = requests.patch(api_url, headers={ # 'Content-Type': 'application/json', # 'Authorization': f'Bearer {TOKEN}' # }, data= { # 'content': my_public_ip, # 'type': 'A' # }) # response_json = response.json() # if response_json['success'] == False: # raise "" # print(f"Successfuly chcanged IP address of {dns_a_record['name']}") # api_url = f"https://api.cloudflare.com/client/v4/zones/{dns_a_record['zone_id']}/dns_records/{dns_a_record['id']}" # response = requests.get(api_url, headers={ # 'Content-Type': 'application/json', # 'Authorization': f'Bearer {TOKEN}' # }) # response_json = response.json()