From cb91739d865f0a5c31534eebd98de700fef05017 Mon Sep 17 00:00:00 2001 From: Jim Martens Date: Sun, 5 Jul 2020 18:20:42 +0200 Subject: [PATCH] Added single point of entry for application --- src/twomartens/allrisscraper/internal.py | 213 ++++++++++++++++++++++ src/twomartens/allrisscraper/main.py | 221 ++--------------------- 2 files changed, 224 insertions(+), 210 deletions(-) create mode 100644 src/twomartens/allrisscraper/internal.py diff --git a/src/twomartens/allrisscraper/internal.py b/src/twomartens/allrisscraper/internal.py new file mode 100644 index 0000000..635998e --- /dev/null +++ b/src/twomartens/allrisscraper/internal.py @@ -0,0 +1,213 @@ +# -*- coding: utf-8 -*- + +# Copyright 2020 Jim Martens +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import configparser +import os +from datetime import date +from datetime import time +from typing import List +from typing import Tuple +from urllib import request + +from selenium import webdriver +from selenium.webdriver.firefox.firefox_binary import FirefoxBinary +from selenium.webdriver.firefox.options import Options +from selenium.webdriver.remote.webelement import WebElement + +from twomartens.allrisscraper import config as config_module +from twomartens.allrisscraper import definitions +from twomartens.allrisscraper import meeting +from twomartens.allrisscraper.definitions import ALLRIS_LOGIN + + +def main() -> None: + config_file = f"{os.getcwd()}/tm-allris-scraper-config.ini" + if not config_module.initialize_config(config_file): + return + + config = configparser.ConfigParser() + config.read(config_file) + district = config["Default"]["district"] + username = config["Default"]["username"] + password = config["Default"]["password"] + pdf_location = config["Default"]["pdflocation"] + firefox_binary = config["Default"]["firefoxBinary"] + base_url = definitions.BASE_LINKS[district] + + options = Options() + options.headless = True + binary = FirefoxBinary(firefox_binary) + driver = webdriver.Firefox(firefox_binary=binary, options=options) + driver.delete_all_cookies() + driver.implicitly_wait(2) + driver.get(ALLRIS_LOGIN) + login(driver, username=username, password=password) + driver.get("https://serviceportal.hamburg.de/HamburgGateway/Service/StartService/ALLMAnd") + driver.get(f"{base_url}/si012.asp") + meetings = get_meetings(driver) + download_documents(driver, meetings, pdf_location, base_url, district) + driver.close() + + +def login(driver: webdriver.Firefox, username: str, password: str) -> None: + login_field = driver.find_element_by_id("Username") + login_field.send_keys(username) + password_field = driver.find_element_by_id("Password") + password_field.send_keys(password) + button = driver.find_element_by_class_name("btn-primary") + button.click() + + +def get_meetings(driver: webdriver.Firefox) -> List[meeting.Meeting]: + elements = driver.find_elements_by_class_name("zl12") + elements.extend(driver.find_elements_by_class_name("zl11")) + meetings = list() + for element in elements: + tds = element.find_elements_by_tag_name("td") + date_obj = get_day(tds[0].text) + time_obj = time.fromisoformat(str(tds[1].text).rstrip()) + agenda_link = tds[4].find_element_by_tag_name("a").get_property("href") + name = tds[4].find_element_by_tag_name("a").text + location = tds[5].text + meetings.append(meeting.Meeting(name=name, date=date_obj, + time=time_obj, end_time=None, + link=agenda_link, location=location, + agenda=None, address=None)) + + return meetings + + +def download_documents(driver: webdriver.Firefox, meetings: List[meeting.Meeting], + pdf_location: str, base_url: str, district: str) -> None: + base_link = f"{base_url}/do027.asp" + for _meeting in meetings: + driver.get(_meeting.link) + td = driver.find_element_by_xpath("//table[@class='tk1']//td[@class='me1']") + form_elements = td.find_elements_by_tag_name("form") + agenda_link, total_link, invitation_link = get_links(form_elements, base_link) + if len(agenda_link) > 0: + driver.get(agenda_link) + save_pdf(driver.current_url, f"{get_formatted_filename(pdf_location, _meeting, district)}/Tagesordnung.pdf") + if len(total_link) > 0: + driver.get(total_link) + save_pdf(driver.current_url, f"{get_formatted_filename(pdf_location, _meeting, district)}/Mappe.pdf") + if len(invitation_link) > 0: + driver.get(invitation_link) + save_pdf(driver.current_url, f"{get_formatted_filename(pdf_location, _meeting, district)}/Einladung.pdf") + save_file(_meeting.location, f"{get_formatted_filename(pdf_location, _meeting, district)}/ort.txt") + + +def get_links(form_elements: List[WebElement], base_link: str) -> Tuple[str, str, str]: + agenda_name = "Tagesordnung" + updated_agenda_name = "Aktuelle TO" + total_name = "Alle Dokumente zur Sitzung im Paket" + total_short_name = "Mappe" + invitation_name = "Einladung" + + links = {} + for element in form_elements: + name = element.find_element_by_class_name("il2_p").get_property("value") + link = f"{base_link}?DOLFDNR={element.find_element_by_name('DOLFDNR').get_property('value')}&options=64" + if name == agenda_name: + links[agenda_name] = link + if name == updated_agenda_name: + links[agenda_name] = link + if name == total_name: + links[total_short_name] = link + if name == invitation_name: + links[invitation_name] = link + + if agenda_name not in links: + links[agenda_name] = "" + if invitation_name not in links: + links[invitation_name] = "" + if total_short_name not in links: + links[total_short_name] = "" + + return links[agenda_name], links[total_short_name], links[invitation_name] + + +def get_formatted_filename(pdf_location: str, meeting_obj: meeting.Meeting, district: str) -> str: + return f"{pdf_location}{meeting_obj.date.isoformat()}_{get_abbreviated_committee_name(meeting_obj.name, district)}" + + +def save_pdf(url: str, dest: str) -> None: + file_data: request = request.urlopen(url) + data_to_write = file_data.read() + os.makedirs(os.path.dirname(dest), exist_ok=True) + with open(dest, "wb") as file: + file.write(data_to_write) + + +def save_file(content: str, dest: str) -> None: + os.makedirs(os.path.dirname(dest), exist_ok=True) + with open(dest, "w") as file: + file.write(content) + + +def get_day(date_str: str) -> date: + date_elements = date_str[date_str.find(",") + 1:].split(".") + return date(int(date_elements[-1]), int(date_elements[-2]), int(date_elements[-3])) + + +def get_abbreviated_committee_name(name: str, district: str) -> str: + start_committee = "Sitzung des Ausschusses" + start_regional_committee = "Sitzung des Regionalausschusses" + start_plenary = "Sitzung der Bezirksversammlung" + start_youth_help_committee = "Sitzung des Jugendhilfeausschusses" + start_other_committee = "Sitzung des" + end_other_committee = "ausschusses" + abbreviated_name = "" + if name.startswith(start_plenary): + abbreviated_name = "BV" + elif name.startswith(start_committee): + second_part = name[len(start_committee):] + second_split = second_part.split(sep=",") + abbreviated_name = get_abbreviation(second_split) + if len(abbreviated_name) == 1: + abbreviated_name = f"A{abbreviated_name}" + elif name.startswith(start_regional_committee): + second_part = name[len(start_regional_committee):] + second_split = second_part.split(sep="/") + abbreviated_name = f"Ra{get_abbreviation(second_split)}" + elif name.startswith(start_youth_help_committee): + abbreviated_name = "JHA" + elif name.startswith(start_other_committee) and name.endswith(end_other_committee): + core_name = name[len(start_other_committee):-len(end_other_committee)] + abbreviated_name = core_name + + if abbreviated_name in definitions.ABBREVIATIONS[district]: + abbreviated_name = definitions.ABBREVIATIONS[district][abbreviated_name] + + return abbreviated_name + + +def get_abbreviation(name): + abbreviated_name = "" + for part in name: + part = part.lstrip() + if "und" in part: + part_split = part.split("und") + first_part = part_split[0].lstrip() + second_part = part_split[1].lstrip() + abbreviated_name = f"{abbreviated_name}{first_part[:1].capitalize()}{second_part[:1].capitalize()}" + else: + abbreviated_name = f"{abbreviated_name}{part[:1].capitalize()}" + return abbreviated_name + + +if __name__ == "__main__": + main() diff --git a/src/twomartens/allrisscraper/main.py b/src/twomartens/allrisscraper/main.py index 33f9c29..2f3cf9f 100644 --- a/src/twomartens/allrisscraper/main.py +++ b/src/twomartens/allrisscraper/main.py @@ -1,214 +1,15 @@ -# -*- coding: utf-8 -*- +import argparse -# Copyright 2020 Jim Martens -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import configparser -from urllib import request -from datetime import date -from datetime import time -import os - -from typing import List, Tuple - -from selenium import webdriver -from selenium.webdriver.common.by import By -from selenium.webdriver.firefox.firefox_binary import FirefoxBinary -from selenium.webdriver.firefox.options import Options -from selenium.webdriver.remote.webelement import WebElement - -from twomartens.allrisscraper import config as config_module -from twomartens.allrisscraper import definitions -from twomartens.allrisscraper import meeting -from twomartens.allrisscraper.definitions import ALLRIS_LOGIN +from twomartens.allrisscraper import internal +from twomartens.allrisscraper import public -def main() -> None: - config_file = f"{os.getcwd()}/tm-allris-scraper-config.ini" - if not config_module.initialize_config(config_file): - return - - config = configparser.ConfigParser() - config.read(config_file) - district = config["Default"]["district"] - username = config["Default"]["username"] - password = config["Default"]["password"] - pdf_location = config["Default"]["pdflocation"] - firefox_binary = config["Default"]["firefoxBinary"] - base_url = definitions.BASE_LINKS[district] +def main(): + parser = argparse.ArgumentParser(description="Scrape the ALLRis website") + parser.add_argument("mode", choices=["oparl", "internal"], help="which mode should be used") + args = parser.parse_args() - options = Options() - options.headless = True - binary = FirefoxBinary(firefox_binary) - driver = webdriver.Firefox(firefox_binary=binary, options=options) - driver.delete_all_cookies() - driver.implicitly_wait(2) - driver.get(ALLRIS_LOGIN) - login(driver, username=username, password=password) - driver.get("https://serviceportal.hamburg.de/HamburgGateway/Service/StartService/ALLMAnd") - driver.get(f"{base_url}/si012.asp") - meetings = get_meetings(driver) - download_documents(driver, meetings, pdf_location, base_url, district) - driver.close() - - -def login(driver: webdriver.Firefox, username: str, password: str) -> None: - login_field = driver.find_element_by_id("Username") - login_field.send_keys(username) - password_field = driver.find_element_by_id("Password") - password_field.send_keys(password) - button = driver.find_element_by_class_name("btn-primary") - button.click() - - -def get_meetings(driver: webdriver.Firefox) -> List[meeting.Meeting]: - elements = driver.find_elements_by_class_name("zl12") - elements.extend(driver.find_elements_by_class_name("zl11")) - meetings = list() - for element in elements: - tds = element.find_elements_by_tag_name("td") - date_obj = get_day(tds[0].text) - time_obj = time.fromisoformat(str(tds[1].text).rstrip()) - agenda_link = tds[4].find_element_by_tag_name("a").get_property("href") - name = tds[4].find_element_by_tag_name("a").text - location = tds[5].text - meetings.append(meeting.Meeting(name=name, date=date_obj, - time=time_obj, end_time=None, - link=agenda_link, location=location, - agenda=None, address=None)) - - return meetings - - -def download_documents(driver: webdriver.Firefox, meetings: List[meeting.Meeting], - pdf_location: str, base_url: str, district: str) -> None: - base_link = f"{base_url}/do027.asp" - for _meeting in meetings: - driver.get(_meeting.link) - td = driver.find_element_by_xpath("//table[@class='tk1']//td[@class='me1']") - form_elements = td.find_elements_by_tag_name("form") - agenda_link, total_link, invitation_link = get_links(form_elements, base_link) - if len(agenda_link) > 0: - driver.get(agenda_link) - save_pdf(driver.current_url, f"{get_formatted_filename(pdf_location, _meeting, district)}/Tagesordnung.pdf") - if len(total_link) > 0: - driver.get(total_link) - save_pdf(driver.current_url, f"{get_formatted_filename(pdf_location, _meeting, district)}/Mappe.pdf") - if len(invitation_link) > 0: - driver.get(invitation_link) - save_pdf(driver.current_url, f"{get_formatted_filename(pdf_location, _meeting, district)}/Einladung.pdf") - save_file(_meeting.location, f"{get_formatted_filename(pdf_location, _meeting, district)}/ort.txt") - - -def get_links(form_elements: List[WebElement], base_link: str) -> Tuple[str, str, str]: - agenda_name = "Tagesordnung" - updated_agenda_name = "Aktuelle TO" - total_name = "Alle Dokumente zur Sitzung im Paket" - total_short_name = "Mappe" - invitation_name = "Einladung" - - links = {} - for element in form_elements: - name = element.find_element_by_class_name("il2_p").get_property("value") - link = f"{base_link}?DOLFDNR={element.find_element_by_name('DOLFDNR').get_property('value')}&options=64" - if name == agenda_name: - links[agenda_name] = link - if name == updated_agenda_name: - links[agenda_name] = link - if name == total_name: - links[total_short_name] = link - if name == invitation_name: - links[invitation_name] = link - - if agenda_name not in links: - links[agenda_name] = "" - if invitation_name not in links: - links[invitation_name] = "" - if total_short_name not in links: - links[total_short_name] = "" - - return links[agenda_name], links[total_short_name], links[invitation_name] - - -def get_formatted_filename(pdf_location: str, meeting_obj: meeting.Meeting, district: str) -> str: - return f"{pdf_location}{meeting_obj.date.isoformat()}_{get_abbreviated_committee_name(meeting_obj.name, district)}" - - -def save_pdf(url: str, dest: str) -> None: - file_data: request = request.urlopen(url) - data_to_write = file_data.read() - os.makedirs(os.path.dirname(dest), exist_ok=True) - with open(dest, "wb") as file: - file.write(data_to_write) - - -def save_file(content: str, dest: str) -> None: - os.makedirs(os.path.dirname(dest), exist_ok=True) - with open(dest, "w") as file: - file.write(content) - - -def get_day(date_str: str) -> date: - date_elements = date_str[date_str.find(",") + 1:].split(".") - return date(int(date_elements[-1]), int(date_elements[-2]), int(date_elements[-3])) - - -def get_abbreviated_committee_name(name: str, district: str) -> str: - start_committee = "Sitzung des Ausschusses" - start_regional_committee = "Sitzung des Regionalausschusses" - start_plenary = "Sitzung der Bezirksversammlung" - start_youth_help_committee = "Sitzung des Jugendhilfeausschusses" - start_other_committee = "Sitzung des" - end_other_committee = "ausschusses" - abbreviated_name = "" - if name.startswith(start_plenary): - abbreviated_name = "BV" - elif name.startswith(start_committee): - second_part = name[len(start_committee):] - second_split = second_part.split(sep=",") - abbreviated_name = get_abbreviation(second_split) - if len(abbreviated_name) == 1: - abbreviated_name = f"A{abbreviated_name}" - elif name.startswith(start_regional_committee): - second_part = name[len(start_regional_committee):] - second_split = second_part.split(sep="/") - abbreviated_name = f"Ra{get_abbreviation(second_split)}" - elif name.startswith(start_youth_help_committee): - abbreviated_name = "JHA" - elif name.startswith(start_other_committee) and name.endswith(end_other_committee): - core_name = name[len(start_other_committee):-len(end_other_committee)] - abbreviated_name = core_name - - if abbreviated_name in definitions.ABBREVIATIONS[district]: - abbreviated_name = definitions.ABBREVIATIONS[district][abbreviated_name] - - return abbreviated_name - - -def get_abbreviation(name): - abbreviated_name = "" - for part in name: - part = part.lstrip() - if "und" in part: - part_split = part.split("und") - first_part = part_split[0].lstrip() - second_part = part_split[1].lstrip() - abbreviated_name = f"{abbreviated_name}{first_part[:1].capitalize()}{second_part[:1].capitalize()}" - else: - abbreviated_name = f"{abbreviated_name}{part[:1].capitalize()}" - return abbreviated_name - - -if __name__ == "__main__": - main() + if args.mode == "oparl": + public.main() + else: + internal.main()