451 lines
16 KiB
Python
451 lines
16 KiB
Python
from rich import print as pr
|
||
from rich.layout import Layout
|
||
from rich.panel import Panel
|
||
import termcharts
|
||
import ollama
|
||
import tkinter as tk
|
||
from tkinter import filedialog
|
||
import re
|
||
import heapq
|
||
import time
|
||
import sys
|
||
|
||
cc_version = "ITA B-2-iv Final"
|
||
|
||
# P7MJ's ITA (In the Air) chat mood analyzer, as a YAY project between 3/2 and 3/3 and finished on 3/3 (A-2-i). No longer so yay...
|
||
# Version B-1-i Refining (aka B-2-i Testing) completed on 3/5
|
||
# Working on B-1-i 3/6 and 3/8. Procrastinating between. A-series, get ready to be lost in time, but i still have some copies
|
||
# B-2-i Testing testing on 3/8. Prelim testing completed.
|
||
# B-2-ii Final working on 3/8. Removing all TEST comments and running more test cases
|
||
# B-2-iii Final working on 3/9 Morning. Added a safeguard in case the LLM hallucinates giving the super_llm_rate.
|
||
# B-2-iv Final working on 3/9 Morning. Fixed and added static colors (then removed them)
|
||
# Presenting is 3/9
|
||
# TODO Add async I DONT KNOW WHEN DONT ASK ME BYE THE EnD
|
||
# TODO Add functions super_llm_rate for all-at-once YAY and batch_llm_rate for 5 at one time YAY
|
||
# TODO Integrate readable_format() YAY
|
||
# In competition with Sean. He's doing much better... NOOOO
|
||
# I HATE the "pg up" and "pg down" buttons above the left and right arrow keys! I always misclick them and my page jumps!
|
||
|
||
# WRITING LETTER TO H. COMBINED WITH THIS CODE MAKES MY BRAIN BURN
|
||
# Coding: ❌
|
||
# Breaking heart: ✔️
|
||
# Burning brain: ✔️
|
||
# Feeling like quitting: ✔️
|
||
|
||
# STATS VS OTHER VERSIONS: (ABORTED)
|
||
# Positive/Negative/Neutral Rating Time
|
||
|
||
# GOODLOG
|
||
# B-2-i/ii: 0/11/19 -0.307 89
|
||
# B-1-i:
|
||
|
||
rating_data = []
|
||
total_messages = 0
|
||
messages_rated = 0
|
||
posanslist = []
|
||
neganslist = []
|
||
chat_summary = ""
|
||
chat_message_list = ""
|
||
|
||
# Find indices of 5 ratings with largest value
|
||
def get_n_largest_indices(data, n):
|
||
largest_items = heapq.nlargest(n, enumerate(data), key=lambda x: x[1])
|
||
indices = [index for index, value in largest_items]
|
||
return indices
|
||
|
||
# Make single or more messages readable
|
||
def readable_format(inputs):
|
||
processed = "<log>\n"
|
||
for i in range(len(inputs)):
|
||
processed += f"{inputs[i][0]}: {inputs[i][1]}\n"
|
||
processed += "</log>"
|
||
return processed
|
||
|
||
# One readable message
|
||
def readable_one_message(single_input_list):
|
||
return f"{single_input_list[0]}: {single_input_list[1]}"
|
||
|
||
# Find indices of 5 ratings with smallest value
|
||
def get_n_smallest_indices(data, n):
|
||
smallest_items = heapq.nsmallest(n, enumerate(data), key=lambda x: x[1])
|
||
indices = [index for index, value in smallest_items]
|
||
return indices
|
||
|
||
# Processes the raw chat log into a readable list
|
||
def parse_chat_log(file_path):
|
||
# Regex to match the timestamp format: [03/02/2026 00:14]
|
||
pattern = re.compile(r"\[\d{2}/\d{2}/\d{4} \d{2}:\d{2}\]\s+(.*)")
|
||
chat_data = []
|
||
current_user = None
|
||
current_message = []
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line or line.startswith("===") or line.startswith("Exported"):
|
||
continue
|
||
match = pattern.match(line)
|
||
if match:
|
||
if current_user:
|
||
chat_data.append([current_user, "\n".join(current_message)])
|
||
current_user = match.group(1)
|
||
current_message = []
|
||
else:
|
||
current_message.append(line)
|
||
if current_user:
|
||
chat_data.append([current_user, "\n".join(current_message)])
|
||
return chat_data
|
||
|
||
def split_thy_list_for_thy_batch_function(inputses):
|
||
processing = inputses.copy()
|
||
batches = []
|
||
if len(inputses) > 5:
|
||
while True: # i actually mean while the numbers are larger than 5
|
||
if len(processing) > 5:
|
||
batches.append(processing[:5])
|
||
del processing[:5]
|
||
else:
|
||
batches.append(processing)
|
||
break
|
||
return batches
|
||
else:
|
||
batches.append(inputses)
|
||
return batches
|
||
|
||
# Filters and processes the values, replacing text answers (switches to -0.5 because usually in that case the topic is so bad that the LLM refuses to rate it, therefore an automatic rating of -0.5), and 0.00s since the LLM would rather classify bad chats as 0.00 rather than mark it negative
|
||
def float_filter(value):
|
||
try:
|
||
float(value)
|
||
if float(value) == 0.00:
|
||
return -0.10
|
||
else:
|
||
return float(value)
|
||
except ValueError:
|
||
return -0.5
|
||
|
||
def process_llm_list(input_string):
|
||
stripped = input_string.strip()
|
||
splitted = stripped.split(", ")
|
||
for i in range(len(splitted)):
|
||
try:
|
||
splitted[i] = float(splitted[i])
|
||
except:
|
||
print(f"E {splitted[i]} not_float")
|
||
splitted[i] = -2
|
||
return splitted
|
||
|
||
# Uses the LLM to rate messages
|
||
def llm_rate(user_input, length):
|
||
global messages_rated
|
||
response = ollama.chat(model='llama3.2', messages=[
|
||
# The system prompt DO NOT DELETE
|
||
{
|
||
'role': 'system',
|
||
'content': (
|
||
"Sentiment Analyzer: Output ONLY a float between -1.0 and 1.0."
|
||
"Chat message format is user: message. Analyze single message."
|
||
"Scoring: Hostile/Insulting (-0.8), Dismissive/Sarcastic (-0.5),"
|
||
"Apathetic/Short (-0.2), Neutral/Functional (0.0), Positive/Link (0.1-1.0)."
|
||
"No prose."
|
||
)
|
||
},
|
||
|
||
# The user (DO NOT DELETE)
|
||
{'role': 'user', 'content': readable_one_message(user_input)}
|
||
])
|
||
|
||
messages_rated += 1 # PLUS ONE TO PROGRESS
|
||
print(f"\r{messages_rated}/{length} messages rated by LLM.", end = "")
|
||
# Add the rating data to rating data
|
||
rating_data.append(response['message']['content'])
|
||
|
||
# TODO make it rate all the messages at once
|
||
def super_llm_rate(inputs):
|
||
while True:
|
||
print("LLM All-at-once Rating Mode rating...", end = " ", flush=True)
|
||
global chatmessagelist
|
||
response = ollama.chat(model='llama3.2', messages=[
|
||
# The system prompt DO NOT DELETE
|
||
{
|
||
'role': 'system',
|
||
'content': (
|
||
"Sentiment Analyzer: Output ONLY a list of floats between -1.0 and 1.0, like '0.0, 0.5, -0.5'."
|
||
"Chat messages are between <log> and </log>. Analyze the chat records and give rating to each message in order."
|
||
"Scoring: Hostile/Insulting (-0.8), Dismissive/Sarcastic (-0.5),"
|
||
"Apathetic/Short (-0.2), Neutral/Functional (0.0), Positive/Link (0.1-1.0)."
|
||
f"There are {len(chatmessagelist)} messages in total. Make sure you have the same amount of ratings."
|
||
"Exactly a comma followed by a space between ratings. No prose. Double check amount."
|
||
)
|
||
},
|
||
|
||
# The user (DO NOT DELETE)
|
||
{'role': 'user', 'content': readable_format(inputs)}
|
||
])
|
||
finished = process_llm_list(response['message']['content'])
|
||
# Add the rating data to rating data
|
||
if len(finished) == len(chatmessagelist):
|
||
for i in range(len(finished)):
|
||
rating_data.append(finished[i])
|
||
break
|
||
else:
|
||
print("LLM Hallucinated!")
|
||
|
||
# Accepts a list of junk! (unprocessed parsed list)
|
||
def batch_llm_rate(inputs):
|
||
global messages_rated, chatmessagelist
|
||
print("Creating batches...", end = " ", flush=True)
|
||
try:
|
||
batcheses = split_thy_list_for_thy_batch_function(inputs)
|
||
print("Success!")
|
||
except:
|
||
print("Fail!")
|
||
sys.exit(0)
|
||
|
||
print("Rating...", end = "", flush=True)
|
||
for i in range(len(batcheses)):
|
||
that_one_batch = batcheses[i]
|
||
response = ollama.chat(model='llama3.2', messages=[
|
||
# The system prompt DO NOT DELETE
|
||
{
|
||
'role': 'system',
|
||
'content': (
|
||
"Sentiment Analyzer: Output ONLY a list of floats between -1.0 and 1.0, like '0.0, 0.5, -0.5'."
|
||
"Chat messages are between <log> and </log>. Analyze the chat records and give rating to each message in order."
|
||
"Scoring: Hostile/Insulting (-0.8), Dismissive/Sarcastic (-0.5),"
|
||
"Apathetic/Short (-0.2), Neutral/Functional (0.0), Positive/Link (0.1-1.0)."
|
||
f"There are {len(that_one_batch)} messages in total. Make sure you have {len(that_one_batch)} ratings."
|
||
"Exactly a comma followed by a space between ratings. No prose. Double check amount."
|
||
)
|
||
},
|
||
|
||
# The user (DO NOT DELETE)
|
||
{'role': 'user', 'content': readable_format(that_one_batch)}
|
||
])
|
||
finished_list = process_llm_list(response['message']['content'])
|
||
for i in range(len(finished_list)):
|
||
rating_data.append(finished_list[i])
|
||
print("\r" + f"{len(rating_data)}/{len(inputs)} messages rated", end = "")
|
||
|
||
# SUMMARIZE EVERYTHING YAYAYAYAAAYAYA
|
||
def chat_summarizer(lists):
|
||
global topicsummarizerlist
|
||
global chat_summary
|
||
print("Summarizing chat...", end = " ", flush=True)
|
||
response = ollama.chat(model='llama3.2', messages=[
|
||
# The system prompt DO NOT DELETE
|
||
{
|
||
'role': 'system',
|
||
'content': (
|
||
'Chat Summarizer Tool: Sumarize given chat between <log> and </log>. Return brief summary and lists of topics discussed in order. Explain chat flow. Hypothesize user personalities. Use as few lines as possible.'
|
||
)
|
||
},
|
||
|
||
# The user (DO NOT DELETE)
|
||
{'role': 'user', 'content': readable_format(lists)}
|
||
])
|
||
chat_summary = response['message']['content']
|
||
print("Success!")
|
||
|
||
# Start ITA Agent
|
||
def start_ita_agent(mood, summary):
|
||
messages = [
|
||
{
|
||
'role': 'system',
|
||
'content': (f"Sentiment Analysis Tool. Respond to the user's questions on analyzed chat."
|
||
f"Analyzed data was: mood = {mood} with 0 being average."
|
||
f"summary: {summary}. Respond in short and concise sentences."
|
||
)
|
||
}
|
||
]
|
||
|
||
print("Type 'exit' or 'quit' to end the session.\n")
|
||
|
||
# 2. Loop forever
|
||
while True:
|
||
user_input = input("You > ")
|
||
if user_input.lower() in ['exit', 'quit']:
|
||
break
|
||
|
||
# 3. Add user input to memory
|
||
messages.append({'role': 'user', 'content': user_input})
|
||
|
||
print("ITA Analysis Agent > ", end="", flush=True)
|
||
|
||
# 4. Stream the response
|
||
full_response = ""
|
||
stream = ollama.chat(model='llama3.2', messages=messages, stream=True)
|
||
|
||
for chunk in stream:
|
||
content = chunk['message']['content']
|
||
print(content, end="", flush=True)
|
||
full_response += content
|
||
|
||
# 5. Add assistant reply to memory so it stays in context
|
||
messages.append({'role': 'assistant', 'content': full_response})
|
||
print() # Add a final newline for formatting
|
||
|
||
# Counts answers for statistics chart
|
||
def countanswers(list):
|
||
goodanswers = 0
|
||
badanswers = 0
|
||
neutralanswers = 0
|
||
for z in range(len(list)):
|
||
if list[z] > -0.3 and list[z] < 0.3:
|
||
neutralanswers += 1
|
||
elif list[z] > 0.3:
|
||
goodanswers += 1
|
||
elif list[z] < -0.3:
|
||
badanswers += 1
|
||
return [goodanswers, badanswers, neutralanswers]
|
||
|
||
|
||
# MAIN
|
||
if __name__ == "__main__":
|
||
# Chat log choosing function
|
||
while True:
|
||
print(f"""
|
||
{"=" * 80}
|
||
ITA (In the Air) Mood Interpretor | Version: {cc_version} - P7MJ
|
||
{"=" * 80}
|
||
|
||
Choose a chat log to evaluate
|
||
[1] goodlog.txt
|
||
[2] badlog.txt
|
||
[3] testlog.txt
|
||
[4] specify""")
|
||
log_to_choose = input(" > ")
|
||
if log_to_choose == "1":
|
||
log_to_choose = "goodlog.txt"
|
||
break
|
||
if log_to_choose == "2":
|
||
log_to_choose = "badlog.txt"
|
||
break
|
||
if log_to_choose == "3":
|
||
log_to_choose = "testlog.txt"
|
||
break
|
||
if log_to_choose == "4":
|
||
log_to_choose = input("Text file name (w/extension) > ")
|
||
break
|
||
else:
|
||
print("Invalid Option!")
|
||
|
||
# Start the timer
|
||
start_time = time.perf_counter()
|
||
|
||
# Processes the chat log and stores in variable
|
||
print("Parsing chat message file...", end = " ")
|
||
try:
|
||
chatmessagelist = parse_chat_log(f"{log_to_choose}")
|
||
except:
|
||
input("File does not exist! Press enter to exit...")
|
||
sys.exit(0)
|
||
print("Complete.")
|
||
|
||
while True:
|
||
rating_mode = input("\nChoose rating mode:\n[1] Individual message rating\n[2] Batches of 5 rating\n[3] All-at-once rating\n > ")
|
||
try:
|
||
nonono = int(rating_mode)
|
||
if nonono <= 3:
|
||
break
|
||
else:
|
||
hi = 3/0
|
||
except:
|
||
print("Not an integer or within range!")
|
||
|
||
if rating_mode == "1":
|
||
# Gives the chatbot each message and has it rate it
|
||
for m in range(len(chatmessagelist)):
|
||
prompt = str(chatmessagelist[m][1])
|
||
llm_rate(prompt, len(chatmessagelist))
|
||
|
||
elif rating_mode == "2":
|
||
batch_llm_rate(chatmessagelist)
|
||
|
||
elif rating_mode == "3":
|
||
super_llm_rate(chatmessagelist)
|
||
|
||
# Filters and processes rating data
|
||
for i in range(len(rating_data)):
|
||
rating_data[i] = float_filter(rating_data[i])
|
||
|
||
# Calculates rating mean
|
||
sum_of_numbers = 0
|
||
for k in range(len(rating_data)):
|
||
sum_of_numbers += rating_data[k]
|
||
sum_of_numbers /= len(rating_data)
|
||
|
||
# Finding them
|
||
list_of_best_answers = get_n_largest_indices(rating_data, 5)
|
||
list_of_worst_answers = get_n_smallest_indices(rating_data, 5)
|
||
|
||
# Calculate best messages and appending to list
|
||
for j in range(len(list_of_best_answers)):
|
||
posanslist.append(f"[{j+1}] {chatmessagelist[list_of_best_answers[j]][0]}: {chatmessagelist[list_of_best_answers[j]][1]}")
|
||
|
||
# Calculate worst messages and appending to list
|
||
for k in range(len(list_of_worst_answers)):
|
||
neganslist.append(f"[{k+1}] {chatmessagelist[list_of_worst_answers[k]][0]}: {chatmessagelist[list_of_worst_answers[k]][1]}")
|
||
|
||
|
||
# Calculate number of results in each catagory
|
||
stastisticresults = countanswers(rating_data)
|
||
|
||
print() # Add a line after the ratings
|
||
|
||
# Summarizes everything
|
||
chat_summarizer((chatmessagelist))
|
||
|
||
# End the timer
|
||
end_time = time.perf_counter()
|
||
elapsed_time = end_time - start_time
|
||
|
||
# Sample data
|
||
data = {"Positive": stastisticresults[0], "Negative": stastisticresults[1], "Neutral": stastisticresults[2]}
|
||
chart1 = termcharts.bar(data, title="Chart of message emotions", rich=True)
|
||
|
||
# Define the layout
|
||
layout = Layout()
|
||
|
||
# 1. Split into Top and Bottom halves
|
||
layout.split_column(
|
||
Layout(name="upper", size=20), # Fixed height for top section
|
||
Layout(name="lower") # Bottom takes the remainder
|
||
)
|
||
|
||
# 2. Configure Top: 1/4 (25%) for chart, rest split 50/50
|
||
layout["upper"].split_row(
|
||
Layout(name="uleft", ratio=1), # 25% of total width (1 part)
|
||
Layout(name="uright", ratio=3), # 75% of total width (3 parts)
|
||
)
|
||
|
||
# Split the 75% right side into two text panels
|
||
layout["uright"].split_row(
|
||
Layout(Panel(f"STATISTICS\n\nRating Average: {round(sum_of_numbers, 3)}\nPositive Ratings: {stastisticresults[0]}\nNegative Ratings: {stastisticresults[1]}\nNeutral Ratings: {stastisticresults[2]}\nTotal messages analyzed: {len(rating_data)}\nTotal time elapsed: {elapsed_time:.4f} seconds")),
|
||
Layout(Panel(fr"""
|
||
___ _________ ________
|
||
|\ \|\___ ___\\ __ \
|
||
\ \ \|___ \ \_\ \ \|\ \
|
||
\ \ \ \ \ \ \ \ __ \
|
||
\ \ \ \ \ \ \ \ \ \ \
|
||
\ \__\ \ \__\ \ \__\ \__\
|
||
\|__| \|__| \|__|\|__|
|
||
|
||
©️2026 P7MJ
|
||
All Rights Reserved
|
||
|
||
In the Air, {cc_version} by P7MJ
|
||
Know the mood - and what to say next.
|
||
"""))
|
||
)
|
||
# 3. Configure Bottom: 50/50 split
|
||
layout["lower"].split_row(
|
||
Layout(Panel(f"OUTSTANDING MESSAGES\n\nMOST POSITIVE ANSWERS\n{posanslist[0][:200]}\n{posanslist[1][:200]}\n{posanslist[2][:200]}\n{posanslist[3][:200]}\n{posanslist[4][:200]}\n\n\nMOST NEGATIVE ANSWERS\n{neganslist[0][:200]}\n{neganslist[1][:200]}\n{neganslist[2][:200]}\n{neganslist[3][:200]}\n{neganslist[4][:200]}")),
|
||
Layout(Panel(f"CHAT SUMMARY\n\n{chat_summary}")),
|
||
)
|
||
|
||
# 4. Fill the chart
|
||
layout["uleft"].update(Panel(chart1))
|
||
|
||
pr(layout)
|
||
input("PRESS ENTER TO LAUNCH ITA CHAT AGENT")
|
||
start_ita_agent(round(sum_of_numbers, 3), chat_summary)
|