-
Notifications
You must be signed in to change notification settings - Fork 0
/
immigration.py
79 lines (66 loc) · 2.57 KB
/
immigration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import os
from flask import Flask, request, jsonify, send_from_directory
from crewai import Agent, Task, Crew
from crewai_tools import SerperDevTool, ScrapeWebsiteTool, WebsiteSearchTool
# Initialize Flask app
app = Flask(__name__)
# Set up API keys
os.environ["SERPER_API_KEY"] = "d4e81a83028357dc5af70321c25760e1dd07e44f"
os.environ["OPENAI_API_KEY"] = "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"
# Instantiate tools
search_tool = SerperDevTool()
scrape_tool = ScrapeWebsiteTool()
web_search_tool = WebsiteSearchTool()
# Create Immigration Information Specialist agent
immigration_agent = Agent(
role='Immigration Information Specialist',
goal='Provide detailed and up-to-date answers to immigration-related queries',
backstory=(
"An expert in immigration policies with extensive knowledge of the latest regulations and procedures."
" Ensure to include a disclaimer advising users to consult an immigration attorney for final decisions."
),
tools=[search_tool, scrape_tool, web_search_tool],
verbose=True
)
# Define research task
research_task = Task(
description='Research the latest immigration policies and procedures from specified websites.',
expected_output='A summary of the latest immigration policies and procedures.',
agent=immigration_agent
)
# Function to handle user questions
def handle_user_questions(question):
# Define query task
query_task = Task(
description=f'Answer the following immigration-related question: "{question}". Ensure to include a disclaimer advising users to consult an immigration attorney.',
expected_output=f'Detailed answer to the question: "{question}" with a disclaimer.',
agent=immigration_agent
)
# Assemble a crew
immigration_crew = Crew(
agents=[immigration_agent],
tasks=[research_task, query_task],
verbose=2
)
# Execute tasks
result = immigration_crew.kickoff()
return result
# Serve the frontend
@app.route('/')
def serve_frontend():
return send_from_directory('.', 'immigration.html')
# Define Flask routes
@app.route('/api/v1/immigration-question', methods=['POST'])
def ask_question():
data = request.get_json()
question = data.get('question', '')
if not question:
return jsonify({"error": "Question is required"}), 400
try:
response = handle_user_questions(question)
return jsonify({"response": response}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
# Run the Flask app
if __name__ == '__main__':
app.run(debug=True)